Skip to content

Commit

Permalink
Merge branch 'feat/DEX-2010/add_process_batch' into 'master'
Browse files Browse the repository at this point in the history
[DEX-2010] feat: add process_batch

Closes DEX-2010

See merge request nstmrt/rubygems/sbmt-kafka_consumer!49
  • Loading branch information
bibendi committed Jun 4, 2024
2 parents 5a8f752 + ce351d5 commit 34a5bb6
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 83 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [2.1.0] - 2024-05-13

### Added

- Implemented method `export_batch` for processing messages in batches

## [2.0.1] - 2024-05-08

### Fixed
Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ require_relative "config/environment"
some-extra-configuration
```

### `Export batch`

To process messages in batches, you need to add the `export_batch` method in the consumer

```ruby
# app/consumers/some_consumer.rb
class SomeConsumer < Sbmt::KafkaConsumer::BaseConsumer
def export_batch(messages)
# some code
end
end
```
__CAUTION__:
- ⚠️ Inbox does not support batch insertion.
- ⚠️ If you want to use this feature, you need to process the stack atomically (eg: insert it into clickhouse in one request).

## CLI

Run the following command to execute a server
Expand Down Expand Up @@ -259,6 +275,7 @@ Also pay attention to the number of processes of the server:

To test your consumer with Rspec, please use [this shared context](./lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb)

### for payload
```ruby
require "sbmt/kafka_consumer/testing"
Expand All @@ -272,6 +289,20 @@ RSpec.describe OrderCreatedConsumer do
end
```

### for payloads
```ruby
require "sbmt/kafka_consumer/testing"
RSpec.describe OrderCreatedConsumer do
include_context "with sbmt karafka consumer"
it "works" do
publish_to_sbmt_karafka_batch(payloads, deserializer: deserializer)
expect { consume_with_sbmt_karafka }.to change(Order, :count).by(1)
end
end
```

## Development

1. Prepare environment
Expand Down
37 changes: 35 additions & 2 deletions lib/sbmt/kafka_consumer/base_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,26 @@ def self.name

def consume
::Rails.application.executor.wrap do
messages.each do |message|
with_instrumentation(message) { do_consume(message) }
if export_batch?
with_batch_instrumentation(messages) do
export_batch(messages)
mark_as_consumed!(messages.last)
end
else
messages.each do |message|
with_instrumentation(message) { do_consume(message) }
end
end
end
end

def export_batch?
if @export_batch_memoized.nil?
@export_batch_memoized = respond_to?(:export_batch)
end
@export_batch_memoized
end

private

def with_instrumentation(message)
Expand Down Expand Up @@ -53,6 +67,25 @@ def with_instrumentation(message)
end
end

def with_batch_instrumentation(messages)
@trace_id = SecureRandom.base58

logger.tagged(
trace_id: trace_id,
first_offset: messages.first.metadata.offset,
last_offset: messages.last.metadata.offset
) do
::Sbmt::KafkaConsumer.monitor.instrument(
"consumer.consumed_batch",
caller: self,
messages: messages,
trace_id: trace_id
) do
yield
end
end
end

def do_consume(message)
log_message(message) if log_payload?

Expand Down
1 change: 1 addition & 0 deletions lib/sbmt/kafka_consumer/instrumentation/base_monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class BaseMonitor < Karafka::Instrumentation::Monitor
SBMT_KAFKA_CONSUMER_EVENTS = %w[
consumer.consumed_one
consumer.inbox.consumed_one
consumer.consumed_batch
].freeze

def initialize
Expand Down
31 changes: 31 additions & 0 deletions lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def enabled?

def trace(&block)
return handle_consumed_one(&block) if @event_id == "consumer.consumed_one"
return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch"
return handle_inbox_consumed_one(&block) if @event_id == "consumer.inbox.consumed_one"
return handle_error(&block) if @event_id == "error.occurred"

Expand All @@ -43,6 +44,23 @@ def handle_consumed_one
end
end

def handle_consumed_batch
return yield unless enabled?

consumer = @payload[:caller]
messages = @payload[:messages]

links = messages.filter_map do |m|
parent_context = ::OpenTelemetry.propagation.extract(m.headers, getter: ::OpenTelemetry::Context::Propagation.text_map_getter)
span_context = ::OpenTelemetry::Trace.current_span(parent_context).context
::OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
end

tracer.in_span("consume batch", links: links, attributes: batch_attrs(consumer, messages), kind: :consumer) do
yield
end
end

def handle_inbox_consumed_one
return yield unless enabled?

Expand Down Expand Up @@ -92,6 +110,19 @@ def consumer_attrs(consumer, message)
attributes.compact
end

def batch_attrs(consumer, messages)
message = messages.first
{
"messaging.system" => "kafka",
"messaging.destination" => message.topic,
"messaging.destination_kind" => "topic",
"messaging.kafka.consumer_group" => consumer.topic.consumer_group.id,
"messaging.batch_size" => messages.count,
"messaging.first_offset" => messages.first.offset,
"messaging.last_offset" => messages.last.offset
}.compact
end

def extract_message_key(key)
# skip encode if already valid utf8
return key if key.nil? || (key.encoding == Encoding::UTF_8 && key.valid_encoding?)
Expand Down
62 changes: 46 additions & 16 deletions lib/sbmt/kafka_consumer/instrumentation/sentry_tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,48 @@ module Instrumentation
class SentryTracer < ::Sbmt::KafkaConsumer::Instrumentation::Tracer
CONSUMER_ERROR_TYPES = %w[
consumer.base.consume_one
consumer.base.consumed_batch
consumer.inbox.consume_one
].freeze

def trace(&block)
return handle_consumed_one(&block) if @event_id == "consumer.consumed_one"
return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch"
return handle_error(&block) if @event_id == "error.occurred"

yield
end

def handle_consumed_one
return yield unless ::Sentry.initialized?

consumer = @payload[:caller]
message = @payload[:message]
trace_id = @payload[:trace_id]

scope, transaction = start_transaction(trace_id, consumer, message)

begin
message = {
trace_id: @payload[:trace_id],
topic: @payload[:message].topic,
offset: @payload[:message].offset
}

with_sentry_transaction(
@payload[:caller],
message
) do
yield
rescue
finish_transaction(transaction, 500)
raise
end
end

finish_transaction(transaction, 200)
scope.clear
def handle_consumed_batch
message_first = @payload[:messages].first
message = {
trace_id: @payload[:trace_id],
topic: message_first.topic,
first_offset: message_first.offset,
last_offset: @payload[:messages].last.offset
}

with_sentry_transaction(
@payload[:caller],
message
) do
yield
end
end

def handle_error
Expand Down Expand Up @@ -64,9 +78,9 @@ def handle_error

private

def start_transaction(trace_id, consumer, message)
def start_transaction(consumer, message)
scope = ::Sentry.get_current_scope
scope.set_tags(trace_id: trace_id, topic: message.topic, offset: message.offset)
scope.set_tags(message)
scope.set_transaction_name("Sbmt/KafkaConsumer/#{consumer.class.name}")

transaction = ::Sentry.start_transaction(name: scope.transaction_name, op: "kafka-consumer")
Expand Down Expand Up @@ -97,6 +111,22 @@ def message_payload(message)
# so in that case we return raw_payload
message.raw_payload
end

def with_sentry_transaction(consumer, message)
return yield unless ::Sentry.initialized?

scope, transaction = start_transaction(consumer, message)

begin
yield
rescue
finish_transaction(transaction, 500)
raise
end

finish_transaction(transaction, 200)
scope.clear
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@

def publish_to_sbmt_karafka(raw_payload, opts = {})
message = Karafka::Messages::Message.new(raw_payload, Karafka::Messages::Metadata.new(metadata_defaults.merge(opts)))
consumer.messages = Karafka::Messages::Messages.new(
[message],
Karafka::Messages::BatchMetadata.new(
topic: test_topic.name,
partition: 0,
processed_at: Time.zone.now,
created_at: Time.zone.now
)
)
consumer.messages = consumer_messages([message])
end

def publish_to_sbmt_karafka_batch(raw_payloads, opts = {})
messages = raw_payloads.map do |p|
Karafka::Messages::Message.new(p, Karafka::Messages::Metadata.new(metadata_defaults.merge(opts)))
end
consumer.messages = consumer_messages(messages)
end

# @return [Hash] message default options
Expand All @@ -58,4 +57,18 @@ def build_consumer(instance)
instance.singleton_class.include Karafka::Processing::Strategies::Default
instance
end

private

def consumer_messages(messages)
Karafka::Messages::Messages.new(
messages,
Karafka::Messages::BatchMetadata.new(
topic: test_topic.name,
partition: 0,
processed_at: Time.zone.now,
created_at: Time.zone.now
)
)
end
end
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_consumer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaConsumer
VERSION = "2.0.1"
VERSION = "2.1.0"
end
end
Loading

0 comments on commit 34a5bb6

Please sign in to comment.