Skip to content

Commit 444ad89

Browse files
committed
Use ruby-kafka 1.3.0 or later
ruby-kafka 1.3.0 includes the changes on zendesk/ruby-kafka#846, and they make ruby-kafka-ec2 much simpler.
1 parent a76e446 commit 444ad89

8 files changed

+144
-271
lines changed

README.md

+7-9
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,17 @@ Or install it yourself as:
2424

2525
### Kafka::EC2::MixedInstanceAssignmentStrategy
2626

27-
`Kafka::EC2::MixedInstanceAssignmentStrategy` is an assignor for auto-scaling groups with mixed instance policies. The throughputs of consumers usually depend on instance families and availability zones. For example, if your application writes data to a database, the throughputs of consumers running on the same availability zone as the writer DB instance is higher.
27+
`Kafka::EC2::MixedInstanceAssignmentStrategy` is an assignor for auto-scaling groups with mixed instance policies. The throughputs of consumers usually depend on instance families and availability zones. For example, if your application writes data to a database, the throughputs of consumers running on the same availability zone as that of the writer DB instance is higher.
2828

29-
To assign more partitions to consumers with high throughputs, you have to define `Kafka::EC2::MixedInstanceAssignmentStrategyFactory` first like below:
29+
To assign more partitions to consumers with high throughputs, you have to initialize `Kafka::EC2::MixedInstanceAssignmentStrategy` first like below:
3030

3131
```ruby
3232
require "aws-sdk-rds"
3333
require "kafka"
3434
require "kafka/ec2"
3535

3636
rds = Aws::RDS::Client.new(region: "ap-northeast-1")
37-
assignment_strategy_factory = Kafka::EC2::MixedInstanceAssignmentStrategyFactory.new(
37+
assignment_strategy = Kafka::EC2::MixedInstanceAssignmentStrategy.new(
3838
instance_family_weights: {
3939
"r4" => 1.00,
4040
"r5" => 1.20,
@@ -68,19 +68,17 @@ assignment_strategy_factory = Kafka::EC2::MixedInstanceAssignmentStrategyFactory
6868

6969
In the preceding example, consumers running on c5 instances will have 1.5x as many partitions compared to consumers running on r4 instances. In a similar way, if the writer DB instance is in ap-northeast-1a, consumers in ap-northeast-1a will have 4x as many partitions compared to consumers in ap-northeast-1c.
7070

71-
You can use `Kafka::EC2::MixedInstanceAssignmentStrategy` by specifying the factory to `Kafka::EC2.with_assignment_strategy_factory` and creating a consumer in the block:
71+
You can use `Kafka::EC2::MixedInstanceAssignmentStrategy` by specifying it to `Kafka#consumer`:
7272

7373

7474
```ruby
75-
consumer = Kafka::EC2.with_assignment_strategy_factory(assignment_strategy_factory) do
76-
kafka.consumer(group_id: ENV["KAFKA_CONSUMER_GROUP_ID"])
77-
end
75+
consumer = kafka.consumer(group_id: ENV["KAFKA_CONSUMER_GROUP_ID"], assignment_strategy: assignment_strategy)
7876
```
7977

8078
You can also specify weights for each combination of availability zones and instance families:
8179

8280
```ruby
83-
assignment_strategy_factory = Kafka::EC2::MixedInstanceAssignmentStrategyFactory.new(
81+
assignment_strategy = Kafka::EC2::MixedInstanceAssignmentStrategy.new(
8482
weights: ->() {
8583
db_cluster = rds.describe_db_clusters(filters: [
8684
{ name: "db-cluster-id", values: [ENV["RDS_CLUSTER"]] },
@@ -121,7 +119,7 @@ assignment_strategy_factory = Kafka::EC2::MixedInstanceAssignmentStrategyFactory
121119
The strategy also has the option `partition_weights`. This is useful when the topic has some skewed partitions. Suppose the partition with ID 0 of the topic "foo" receives twice as many records as other partitions. To reduce the number of partitions assigned to the consumer that consumes the partition with ID 0, specify `partition_weights` like below:
122120

123121
```ruby
124-
assignment_strategy_factory = Kafka::EC2::MixedInstanceAssignmentStrategyFactory.new(
122+
assignment_strategy = Kafka::EC2::MixedInstanceAssignmentStrategy.new(
125123
partition_weights: {
126124
"foo" => {
127125
0 => 2,

lib/kafka/ec2.rb

+1-17
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,7 @@
1-
require "kafka/ec2/ext/consumer_group"
2-
require "kafka/ec2/ext/protocol/join_group_request"
3-
require "kafka/ec2/mixed_instance_assignment_strategy_factory"
1+
require "kafka/ec2/mixed_instance_assignment_strategy"
42
require "kafka/ec2/version"
53

64
module Kafka
75
class EC2
8-
class << self
9-
attr_reader :assignment_strategy_factory
10-
11-
def with_assignment_strategy_factory(factory)
12-
@assignment_strategy_factory = factory
13-
yield
14-
ensure
15-
@assignment_strategy_factory = nil
16-
end
17-
18-
def assignment_strategy_classes
19-
@assignment_strategy_classes ||= {}
20-
end
21-
end
226
end
237
end

lib/kafka/ec2/ext/consumer_group.rb

-33
This file was deleted.

lib/kafka/ec2/ext/protocol/join_group_request.rb

-39
This file was deleted.

lib/kafka/ec2/mixed_instance_assignment_strategy.rb

+36-36
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@
66
module Kafka
77
class EC2
88
class MixedInstanceAssignmentStrategy
9-
# metadata is a byte sequence created by Kafka::Protocol::ConsumerGroupProtocol.encode
10-
attr_accessor :member_id_to_metadata
9+
DELIMITER = ","
1110

12-
# @param cluster [Kafka::Cluster]
1311
# @param instance_family_weights [Hash{String => Numeric}, Proc] a hash whose the key
1412
# is the instance family and whose value is the weight. If the object is a proc,
1513
# it must returns such a hash and the proc is called every time the method "assign"
@@ -23,22 +21,35 @@ class MixedInstanceAssignmentStrategy
2321
# instance_family_weights or availability_zone_weights. If the object is a proc,
2422
# it must returns such a hash and the proc is called every time the method "assign"
2523
# is called.
26-
def initialize(cluster:, instance_family_weights: {}, availability_zone_weights: {}, weights: {}, partition_weights: {})
27-
@cluster = cluster
24+
def initialize(instance_family_weights: {}, availability_zone_weights: {}, weights: {}, partition_weights: {})
2825
@instance_family_weights = instance_family_weights
2926
@availability_zone_weights = availability_zone_weights
3027
@weights = weights
3128
@partition_weights = partition_weights
3229
end
3330

31+
def protocol_name
32+
"mixedinstance"
33+
end
34+
35+
def user_data
36+
Net::HTTP.start("169.254.169.254", 80) do |http|
37+
[
38+
http.get("/latest/meta-data/instance-id").body,
39+
http.get("/latest/meta-data/instance-type").body,
40+
http.get("/latest/meta-data/placement/availability-zone").body,
41+
].join(DELIMITER)
42+
end
43+
end
44+
3445
# Assign the topic partitions to the group members.
3546
#
3647
# @param members [Array<String>] member ids
3748
# @param topics [Array<String>] topics
3849
# @return [Hash{String => Protocol::MemberAssignment}] a hash mapping member
3950
# ids to assignments.
40-
def assign(members:, topics:)
41-
group_assignment = {}
51+
def call(cluster:, members:, partitions:)
52+
member_id_to_partitions = Hash.new { |h, k| h[k] = [] }
4253
instance_id_to_capacity = Hash.new(0)
4354
instance_id_to_member_ids = Hash.new { |h, k| h[k] = [] }
4455
total_capacity = 0
@@ -47,28 +58,17 @@ def assign(members:, topics:)
4758
instance_family_to_capacity = @instance_family_weights.is_a?(Proc) ? @instance_family_weights.call() : @instance_family_weights
4859
az_to_capacity = @availability_zone_weights.is_a?(Proc) ? @availability_zone_weights.call() : @availability_zone_weights
4960
weights = @weights.is_a?(Proc) ? @weights.call() : @weights
50-
members.each do |member_id|
51-
group_assignment[member_id] = Protocol::MemberAssignment.new
52-
53-
instance_id, instance_type, az = member_id_to_metadata[member_id].split(",")
61+
members.each do |member_id, metadata|
62+
instance_id, instance_type, az = metadata.user_data.split(DELIMITER)
5463
instance_id_to_member_ids[instance_id] << member_id
5564
member_id_to_instance_id[member_id] = instance_id
5665
capacity = calculate_capacity(instance_type, az, instance_family_to_capacity, az_to_capacity, weights)
5766
instance_id_to_capacity[instance_id] += capacity
5867
total_capacity += capacity
5968
end
6069

61-
topic_partitions = topics.flat_map do |topic|
62-
begin
63-
partitions = @cluster.partitions_for(topic).map(&:partition_id)
64-
rescue UnknownTopicOrPartition
65-
raise UnknownTopicOrPartition, "unknown topic #{topic}"
66-
end
67-
Array.new(partitions.count) { topic }.zip(partitions)
68-
end
69-
70-
partition_weights = build_partition_weights(topics)
71-
partition_weight_per_capacity = topic_partitions.sum { |topic, partition| partition_weights.dig(topic, partition) } / total_capacity
70+
partition_weights = build_partition_weights(partitions)
71+
partition_weight_per_capacity = partitions.sum { |partition| partition_weights.dig(partition.topic, partition.partition_id) } / total_capacity
7272

7373
last_index = 0
7474
member_id_to_acceptable_partition_weight = {}
@@ -77,12 +77,12 @@ def assign(members:, topics:)
7777
member_ids = instance_id_to_member_ids[instance_id]
7878
member_ids.each do |member_id|
7979
acceptable_partition_weight = capacity * partition_weight_per_capacity / member_ids.size
80-
while last_index < topic_partitions.size
81-
topic, partition = topic_partitions[last_index]
82-
partition_weight = partition_weights.dig(topic, partition)
80+
while last_index < partitions.size
81+
partition = partitions[last_index]
82+
partition_weight = partition_weights.dig(partition.topic, partition.partition_id)
8383
break if acceptable_partition_weight - partition_weight < 0
8484

85-
group_assignment[member_id].assign(topic, [partition])
85+
member_id_to_partitions[member_id] << partition
8686
acceptable_partition_weight -= partition_weight
8787

8888
last_index += 1
@@ -93,25 +93,25 @@ def assign(members:, topics:)
9393
end
9494
end
9595

96-
while last_index < topic_partitions.size
96+
while last_index < partitions.size
9797
max_acceptable_partition_weight = member_id_to_acceptable_partition_weight.values.max
9898
member_ids = member_id_to_acceptable_partition_weight.select { |_, w| w == max_acceptable_partition_weight }.keys
9999
if member_ids.size == 1
100100
member_id = member_ids.first
101101
else
102102
member_id = member_ids.max_by { |id| instance_id_to_total_acceptable_partition_weight[member_id_to_instance_id[id]] }
103103
end
104-
topic, partition = topic_partitions[last_index]
105-
group_assignment[member_id].assign(topic, [partition])
104+
partition = partitions[last_index]
105+
member_id_to_partitions[member_id] << partition
106106

107-
partition_weight = partition_weights.dig(topic, partition)
107+
partition_weight = partition_weights.dig(partition.topic, partition.partition_id)
108108
member_id_to_acceptable_partition_weight[member_id] -= partition_weight
109109
instance_id_to_total_acceptable_partition_weight[member_id_to_instance_id[member_id]] -= partition_weight
110110

111111
last_index += 1
112112
end
113113

114-
group_assignment
114+
member_id_to_partitions
115115
rescue Kafka::LeaderNotAvailable
116116
sleep 1
117117
retry
@@ -126,12 +126,12 @@ def calculate_capacity(instance_type, az, instance_family_to_capacity, az_to_cap
126126
(capacity || instance_family_to_capacity.fetch(instance_family, 1) * az_to_capacity.fetch(az, 1)).to_f
127127
end
128128

129-
def build_partition_weights(topics)
129+
def build_partition_weights(partitions)
130130
# Duplicate the weights to not destruct @partition_weights or the return value of @partition_weights
131-
weights = (@partition_weights.is_a?(Proc) ? @partition_weights.call() : @partition_weights).dup
132-
topics.each do |t|
133-
weights[t] = weights[t].dup || {}
134-
weights[t].default = 1
131+
weights = (@partition_weights.is_a?(Proc) ? @partition_weights.call : @partition_weights).dup
132+
partitions.map(&:topic).uniq.each do |topic|
133+
weights[topic] = weights[topic].dup || {}
134+
weights[topic].default = 1
135135
end
136136

137137
weights

lib/kafka/ec2/mixed_instance_assignment_strategy_factory.rb

-30
This file was deleted.

0 commit comments

Comments
 (0)