diff --git a/CHANGELOG.md b/CHANGELOG.md index f26d0af..d388941 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [2.1.0] - 2024-05-13 + +### Added + +- Implemented method `export_batch` for processing messages in batches + ## [2.0.1] - 2024-05-08 ### Fixed diff --git a/README.md b/README.md index 04317cf..ac2abed 100644 --- a/README.md +++ b/README.md @@ -231,6 +231,22 @@ require_relative "config/environment" some-extra-configuration ``` +### `Export batch` + +To process messages in batches, you need to add the `export_batch` method in the consumer + +```ruby +# app/consumers/some_consumer.rb +class SomeConsumer < Sbmt::KafkaConsumer::BaseConsumer + def export_batch(messages) + # some code + end +end +``` +__CAUTION__: +- ⚠️ Inbox does not support batch insertion. +- ⚠️ If you want to use this feature, you need to process the stack atomically (eg: insert it into clickhouse in one request). + ## CLI Run the following command to execute a server @@ -259,6 +275,7 @@ Also pay attention to the number of processes of the server: To test your consumer with Rspec, please use [this shared context](./lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb) +### for payload ```ruby require "sbmt/kafka_consumer/testing" @@ -272,6 +289,20 @@ RSpec.describe OrderCreatedConsumer do end ``` +### for payloads +```ruby +require "sbmt/kafka_consumer/testing" + +RSpec.describe OrderCreatedConsumer do + include_context "with sbmt karafka consumer" + + it "works" do + publish_to_sbmt_karafka_batch(payloads, deserializer: deserializer) + expect { consume_with_sbmt_karafka }.to change(Order, :count).by(1) + end +end +``` + ## Development 1. Prepare environment diff --git a/lib/sbmt/kafka_consumer/base_consumer.rb b/lib/sbmt/kafka_consumer/base_consumer.rb index 9f86ef9..887338c 100644 --- a/lib/sbmt/kafka_consumer/base_consumer.rb +++ b/lib/sbmt/kafka_consumer/base_consumer.rb @@ -17,12 +17,26 @@ def self.name def consume ::Rails.application.executor.wrap do - messages.each do |message| - with_instrumentation(message) { do_consume(message) } + if export_batch? + with_batch_instrumentation(messages) do + export_batch(messages) + mark_as_consumed!(messages.last) + end + else + messages.each do |message| + with_instrumentation(message) { do_consume(message) } + end end end end + def export_batch? + if @export_batch_memoized.nil? + @export_batch_memoized = respond_to?(:export_batch) + end + @export_batch_memoized + end + private def with_instrumentation(message) @@ -53,6 +67,25 @@ def with_instrumentation(message) end end + def with_batch_instrumentation(messages) + @trace_id = SecureRandom.base58 + + logger.tagged( + trace_id: trace_id, + first_offset: messages.first.metadata.offset, + last_offset: messages.last.metadata.offset + ) do + ::Sbmt::KafkaConsumer.monitor.instrument( + "consumer.consumed_batch", + caller: self, + messages: messages, + trace_id: trace_id + ) do + yield + end + end + end + def do_consume(message) log_message(message) if log_payload? diff --git a/lib/sbmt/kafka_consumer/instrumentation/base_monitor.rb b/lib/sbmt/kafka_consumer/instrumentation/base_monitor.rb index c026833..6249d7d 100644 --- a/lib/sbmt/kafka_consumer/instrumentation/base_monitor.rb +++ b/lib/sbmt/kafka_consumer/instrumentation/base_monitor.rb @@ -9,6 +9,7 @@ class BaseMonitor < Karafka::Instrumentation::Monitor SBMT_KAFKA_CONSUMER_EVENTS = %w[ consumer.consumed_one consumer.inbox.consumed_one + consumer.consumed_batch ].freeze def initialize diff --git a/lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb b/lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb index 2b8cda3..5068cfa 100644 --- a/lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb +++ b/lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb @@ -20,6 +20,7 @@ def enabled? def trace(&block) return handle_consumed_one(&block) if @event_id == "consumer.consumed_one" + return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch" return handle_inbox_consumed_one(&block) if @event_id == "consumer.inbox.consumed_one" return handle_error(&block) if @event_id == "error.occurred" @@ -43,6 +44,23 @@ def handle_consumed_one end end + def handle_consumed_batch + return yield unless enabled? + + consumer = @payload[:caller] + messages = @payload[:messages] + + links = messages.filter_map do |m| + parent_context = ::OpenTelemetry.propagation.extract(m.headers, getter: ::OpenTelemetry::Context::Propagation.text_map_getter) + span_context = ::OpenTelemetry::Trace.current_span(parent_context).context + ::OpenTelemetry::Trace::Link.new(span_context) if span_context.valid? + end + + tracer.in_span("consume batch", links: links, attributes: batch_attrs(consumer, messages), kind: :consumer) do + yield + end + end + def handle_inbox_consumed_one return yield unless enabled? @@ -92,6 +110,19 @@ def consumer_attrs(consumer, message) attributes.compact end + def batch_attrs(consumer, messages) + message = messages.first + { + "messaging.system" => "kafka", + "messaging.destination" => message.topic, + "messaging.destination_kind" => "topic", + "messaging.kafka.consumer_group" => consumer.topic.consumer_group.id, + "messaging.batch_size" => messages.count, + "messaging.first_offset" => messages.first.offset, + "messaging.last_offset" => messages.last.offset + }.compact + end + def extract_message_key(key) # skip encode if already valid utf8 return key if key.nil? || (key.encoding == Encoding::UTF_8 && key.valid_encoding?) diff --git a/lib/sbmt/kafka_consumer/instrumentation/sentry_tracer.rb b/lib/sbmt/kafka_consumer/instrumentation/sentry_tracer.rb index f71b26d..4b83bdb 100644 --- a/lib/sbmt/kafka_consumer/instrumentation/sentry_tracer.rb +++ b/lib/sbmt/kafka_consumer/instrumentation/sentry_tracer.rb @@ -9,34 +9,48 @@ module Instrumentation class SentryTracer < ::Sbmt::KafkaConsumer::Instrumentation::Tracer CONSUMER_ERROR_TYPES = %w[ consumer.base.consume_one + consumer.base.consumed_batch consumer.inbox.consume_one ].freeze def trace(&block) return handle_consumed_one(&block) if @event_id == "consumer.consumed_one" + return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch" return handle_error(&block) if @event_id == "error.occurred" yield end def handle_consumed_one - return yield unless ::Sentry.initialized? - - consumer = @payload[:caller] - message = @payload[:message] - trace_id = @payload[:trace_id] - - scope, transaction = start_transaction(trace_id, consumer, message) - - begin + message = { + trace_id: @payload[:trace_id], + topic: @payload[:message].topic, + offset: @payload[:message].offset + } + + with_sentry_transaction( + @payload[:caller], + message + ) do yield - rescue - finish_transaction(transaction, 500) - raise end + end - finish_transaction(transaction, 200) - scope.clear + def handle_consumed_batch + message_first = @payload[:messages].first + message = { + trace_id: @payload[:trace_id], + topic: message_first.topic, + first_offset: message_first.offset, + last_offset: @payload[:messages].last.offset + } + + with_sentry_transaction( + @payload[:caller], + message + ) do + yield + end end def handle_error @@ -64,9 +78,9 @@ def handle_error private - def start_transaction(trace_id, consumer, message) + def start_transaction(consumer, message) scope = ::Sentry.get_current_scope - scope.set_tags(trace_id: trace_id, topic: message.topic, offset: message.offset) + scope.set_tags(message) scope.set_transaction_name("Sbmt/KafkaConsumer/#{consumer.class.name}") transaction = ::Sentry.start_transaction(name: scope.transaction_name, op: "kafka-consumer") @@ -97,6 +111,22 @@ def message_payload(message) # so in that case we return raw_payload message.raw_payload end + + def with_sentry_transaction(consumer, message) + return yield unless ::Sentry.initialized? + + scope, transaction = start_transaction(consumer, message) + + begin + yield + rescue + finish_transaction(transaction, 500) + raise + end + + finish_transaction(transaction, 200) + scope.clear + end end end end diff --git a/lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb b/lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb index 5913639..055a37a 100644 --- a/lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb +++ b/lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb @@ -28,15 +28,14 @@ def publish_to_sbmt_karafka(raw_payload, opts = {}) message = Karafka::Messages::Message.new(raw_payload, Karafka::Messages::Metadata.new(metadata_defaults.merge(opts))) - consumer.messages = Karafka::Messages::Messages.new( - [message], - Karafka::Messages::BatchMetadata.new( - topic: test_topic.name, - partition: 0, - processed_at: Time.zone.now, - created_at: Time.zone.now - ) - ) + consumer.messages = consumer_messages([message]) + end + + def publish_to_sbmt_karafka_batch(raw_payloads, opts = {}) + messages = raw_payloads.map do |p| + Karafka::Messages::Message.new(p, Karafka::Messages::Metadata.new(metadata_defaults.merge(opts))) + end + consumer.messages = consumer_messages(messages) end # @return [Hash] message default options @@ -58,4 +57,18 @@ def build_consumer(instance) instance.singleton_class.include Karafka::Processing::Strategies::Default instance end + + private + + def consumer_messages(messages) + Karafka::Messages::Messages.new( + messages, + Karafka::Messages::BatchMetadata.new( + topic: test_topic.name, + partition: 0, + processed_at: Time.zone.now, + created_at: Time.zone.now + ) + ) + end end diff --git a/lib/sbmt/kafka_consumer/version.rb b/lib/sbmt/kafka_consumer/version.rb index fcbfee6..c832608 100644 --- a/lib/sbmt/kafka_consumer/version.rb +++ b/lib/sbmt/kafka_consumer/version.rb @@ -2,6 +2,6 @@ module Sbmt module KafkaConsumer - VERSION = "2.0.1" + VERSION = "2.1.0" end end diff --git a/spec/sbmt/kafka_consumer/base_consumer_spec.rb b/spec/sbmt/kafka_consumer/base_consumer_spec.rb index cee2dbc..833bd45 100644 --- a/spec/sbmt/kafka_consumer/base_consumer_spec.rb +++ b/spec/sbmt/kafka_consumer/base_consumer_spec.rb @@ -5,89 +5,121 @@ describe Sbmt::KafkaConsumer::BaseConsumer do include_context "with sbmt karafka consumer" - let(:consumer_class) do - Class.new(described_class.consumer_klass) do - attr_reader :consumed, :consume_count - - def initialize(error: nil, reset_error: true) - @error = error - @reset_error = reset_error - super() - end + context "when the consumer processes one message at a time" do + let(:consumer_class) do + Class.new(described_class.consumer_klass) do + attr_reader :consumed, :consume_count + + def initialize(error: nil, reset_error: true) + @error = error + @reset_error = reset_error + super() + end - def process_message(_message) - @consume_count = @consume_count.to_i + 1 + def process_message(_message) + @consume_count = @consume_count.to_i + 1 + + if @error + error_to_raise = @error + @error = nil if @reset_error - if @error - error_to_raise = @error - @error = nil if @reset_error + raise error_to_raise, "test error" + end - raise error_to_raise, "test error" + @consumed = true end - @consumed = true + def consumed? + !!@consumed + end end + end - def consumed? - !!@consumed - end + let(:consumer) { build_consumer(consumer_class.new) } + + let(:payload) { "test-payload" } + let(:headers) { {"Test-Header" => "test-header-value"} } + let(:key) { "test-key" } + let(:consume_error) { nil } + + before do + stub_const("Sbmt::KafkaConsumer::BaseConsumer::DEFAULT_RETRY_DELAY_MULTIPLIER", 0) + allow(consumer).to receive(:log_payload?).and_return(true) + publish_to_sbmt_karafka(payload.to_json, headers: headers, key: key) end - end - let(:consumer) { build_consumer(consumer_class.new) } + it "consumes" do + consume_with_sbmt_karafka + expect(consumer).to be_consumed + end - let(:payload) { "test-payload" } - let(:headers) { {"Test-Header" => "test-header-value"} } - let(:key) { "test-key" } - let(:consume_error) { nil } + it "logs message" do + expect(Rails.logger).to receive(:info).with(/Successfully consumed message/) + expect(Rails.logger).to receive(:info).with(/#{payload}/) - before do - stub_const("Sbmt::KafkaConsumer::BaseConsumer::DEFAULT_RETRY_DELAY_MULTIPLIER", 0) - allow(consumer).to receive(:log_payload?).and_return(true) - publish_to_sbmt_karafka(payload.to_json, headers: headers, key: key) - end + consume_with_sbmt_karafka + expect(consumer).to be_consumed + end - it "consumes" do - consume_with_sbmt_karafka - expect(consumer).to be_consumed - end + context "when get active record error" do + let(:error) { ActiveRecord::StatementInvalid } + let(:consumer) { build_consumer(consumer_class.new(error: error)) } - it "logs message" do - expect(Rails.logger).to receive(:info).with(/Successfully consumed message/) - expect(Rails.logger).to receive(:info).with(/#{payload}/) + it "tracks error" do + allow(Rails.logger).to receive(:error) - consume_with_sbmt_karafka - expect(consumer).to be_consumed - end + consume_with_sbmt_karafka + expect(consumer).not_to be_consumed + expect(consumer.consume_count).to eq 1 + end + end - context "when get active record error" do - let(:error) { ActiveRecord::StatementInvalid } - let(:consumer) { build_consumer(consumer_class.new(error: error)) } + context "when consumer raises exception" do + let(:consumer_class) do + base_klass = described_class.consumer_klass(skip_on_error: true) + Class.new(base_klass) do + def process_message(_message) + raise "always throws an exception" + end + end + end + let(:consumer) { build_consumer(consumer_class.new) } - it "tracks error" do - allow(Rails.logger).to receive(:error) + it "skips message if skip_on_error is set" do + expect(Rails.logger).to receive(:error).twice - consume_with_sbmt_karafka - expect(consumer).not_to be_consumed - expect(consumer.consume_count).to eq 1 + consume_with_sbmt_karafka + end end end - context "when consumer raises exception" do + context "when the consumer export messages in batches" do let(:consumer_class) do - base_klass = described_class.consumer_klass(skip_on_error: true) - Class.new(base_klass) do - def process_message(_message) - raise "always throws an exception" + Class.new(described_class.consumer_klass) do + attr_reader :consumed + def export_batch(messages) + Rails.logger.info "Export batch #{messages.count} messages" + @consumed = true + end + + def consumed? + !!@consumed end end end + let(:consumer) { build_consumer(consumer_class.new) } + let(:payload) { "test-payload" } - it "skips message if skip_on_error is set" do - expect(Rails.logger).to receive(:error).twice + before do + allow(Rails.logger).to receive(:info) + publish_to_sbmt_karafka(payload.to_json) + end + it "consumes" do consume_with_sbmt_karafka + expect(consumer).to be_consumed + expect(Rails.logger).to have_received(:info).with(/Export batch/) end end end diff --git a/spec/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer_spec.rb b/spec/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer_spec.rb index 0215ba6..a5240d6 100644 --- a/spec/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer_spec.rb +++ b/spec/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer_spec.rb @@ -5,11 +5,18 @@ describe Sbmt::KafkaConsumer::Instrumentation::OpenTelemetryTracer do let(:topic_name) { "topic" } let(:message) { OpenStruct.new(topic: topic_name, offset: 0, partition: 1, metadata: {topic: topic_name}, payload: "message payload") } + let(:batch_messages) { + [ + OpenStruct.new(topic: "topic", offset: 0, partition: 1, metadata: {topic: "topic"}, payload: "message payload"), + OpenStruct.new(topic: "another_topic", offset: 1, partition: 2, metadata: {topic: "another_topic"}, payload: "another message payload") + ] + } let(:consumer_group_name) { "consumer-group-name" } let(:consumer_group) { OpenStruct.new(id: consumer_group_name) } let(:consumer_topic) { OpenStruct.new(consumer_group: consumer_group) } let(:consumer) { OpenStruct.new(topic: consumer_topic, inbox_name: "inbox/name", event_name: nil) } let(:event_payload) { OpenStruct.new(caller: consumer, message: message, inbox_name: "inbox/name", event_name: nil, status: "failure") } + let(:event_payload_with_batch) { OpenStruct.new(caller: consumer, messages: batch_messages, inbox_name: "inbox/name", event_name: nil, status: "failure") } describe "when disabled" do before { described_class.enabled = false } @@ -44,6 +51,19 @@ described_class.new("consumer.consumed_one", event_payload).trace {} end + it "traces messages" do + expect(tracer).to receive(:in_span).with("consume batch", links: [], kind: :consumer, attributes: { + "messaging.destination" => topic_name, + "messaging.destination_kind" => "topic", + "messaging.kafka.consumer_group" => consumer_group_name, + "messaging.system" => "kafka", + "messaging.batch_size" => 2, + "messaging.first_offset" => 0, + "messaging.last_offset" => 1 + }) + described_class.new("consumer.consumed_batch", event_payload_with_batch).trace {} + end + it "traces inbox message" do expect(tracer).to receive(:in_span).with("inbox inbox/name process", kind: :consumer, attributes: { "inbox.inbox_name" => "inbox/name", diff --git a/spec/sbmt/kafka_consumer/instrumentation/sentry_tracer_spec.rb b/spec/sbmt/kafka_consumer/instrumentation/sentry_tracer_spec.rb index 48a8ac2..6ea6f42 100644 --- a/spec/sbmt/kafka_consumer/instrumentation/sentry_tracer_spec.rb +++ b/spec/sbmt/kafka_consumer/instrumentation/sentry_tracer_spec.rb @@ -7,7 +7,14 @@ let(:trace_id) { "trace-id" } let(:caller) { double("consumer instance") } let(:message) { OpenStruct.new(topic: "topic", offset: 0, partition: 1, metadata: {topic: "topic"}, payload: "message payload") } + let(:batch_messages) { + [ + OpenStruct.new(topic: "topic", offset: 0, partition: 1, metadata: {topic: "topic"}, payload: "message payload"), + OpenStruct.new(topic: "another_topic", offset: 1, partition: 2, metadata: {topic: "another_topic"}, payload: "another message payload") + ] + } let(:event_payload) { OpenStruct.new(caller: caller, message: message, trace_id: trace_id, type: nil) } + let(:event_payload_with_batch) { OpenStruct.new(caller: caller, messages: batch_messages, trace_id: trace_id, type: nil) } before do allow(caller).to receive(:messages).and_return([message]) @@ -92,6 +99,75 @@ end end + context "when event is consumer.consumed_batch" do + before { allow(Sentry).to receive(:initialized?).and_return(true) } + + it "traces message" do + expect(Sentry).to receive(:get_current_scope).and_return(Sentry::Scope.new) + expect(Sentry).to receive(:start_transaction).and_return(sentry_transaction) + + expect(sentry_transaction).to receive(:is_a?).and_return(Sentry::Span) + expect(sentry_transaction).to receive(:set_http_status).with(200) + expect(sentry_transaction).to receive(:finish) + + described_class.new("consumer.consumed_batch", event_payload_with_batch).trace {} + end + + context "with scope" do + let(:sentry_scope) { instance_double(Sentry::Scope) } + + before do + allow(sentry_scope).to receive(:transaction_name) + allow(sentry_scope).to receive(:set_span) + allow(sentry_scope).to receive(:clear) + allow(sentry_transaction).to receive(:set_http_status) + allow(sentry_transaction).to receive(:finish) + end + + context "when custom consumer class is used" do + let(:custom_class) { stub_const("SomeModule::CustomConsumerClass", Class.new(Sbmt::KafkaConsumer::BaseConsumer)) } + let(:caller) { custom_class.consumer_klass.new } + + it "sets proper params" do + expect(Sentry).to receive(:get_current_scope).and_return(sentry_scope) + expect(Sentry).to receive(:start_transaction).and_return(sentry_transaction) + + expect(sentry_scope).to receive(:set_transaction_name).with("Sbmt/KafkaConsumer/SomeModule::CustomConsumerClass") + expect(sentry_scope).to receive(:set_tags).with(hash_including(first_offset: 0, last_offset: 1, topic: "topic", trace_id: "trace-id")) + + described_class.new("consumer.consumed_batch", event_payload_with_batch).trace {} + end + end + + context "when base consumer class is used" do + let(:caller) { Sbmt::KafkaConsumer::BaseConsumer.consumer_klass.new } + + it "sets proper params" do + expect(Sentry).to receive(:get_current_scope).and_return(sentry_scope) + expect(Sentry).to receive(:start_transaction).and_return(sentry_transaction) + + expect(sentry_scope).to receive(:set_transaction_name).with("Sbmt/KafkaConsumer/Sbmt::KafkaConsumer::BaseConsumer") + expect(sentry_scope).to receive(:set_tags).with(hash_including(first_offset: 0, last_offset: 1, topic: "topic", trace_id: "trace-id")) + + described_class.new("consumer.consumed_batch", event_payload_with_batch).trace {} + end + end + end + + it "traces message when error is raised" do + expect(Sentry).to receive(:get_current_scope).and_return(Sentry::Scope.new) + expect(Sentry).to receive(:start_transaction).and_return(sentry_transaction) + + expect(sentry_transaction).to receive(:is_a?).and_return(Sentry::Span) + expect(sentry_transaction).to receive(:set_http_status).with(500) + expect(sentry_transaction).to receive(:finish) + + expect do + described_class.new("consumer.consumed_batch", event_payload_with_batch).trace { raise "error" } + end.to raise_error("error") + end + end + context "when event is error.occurred" do let(:ex) { StandardError.new("error") } let(:sentry_scope) { double("sentry scope") }