Skip to content

Commit

Permalink
Support reassignment replicas table
Browse files Browse the repository at this point in the history
  • Loading branch information
kawamuray committed Dec 14, 2020
1 parent 5286f5e commit 1147706
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/main/java/kmql/TableRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import kmql.table.ConfigsTable;
import kmql.table.ConsumersTable;
import kmql.table.LogdirsTable;
import kmql.table.ReassignmentsTable;
import kmql.table.ReplicasTable;

/**
Expand All @@ -25,6 +26,7 @@ public class TableRegistry implements Iterable<Map.Entry<String, Table>> {
registerDefault(new LogdirsTable());
registerDefault(new ConfigsTable());
registerDefault(new ConsumersTable());
registerDefault(new ReassignmentsTable());
}

private final ConcurrentMap<String, Table> tables;
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/kmql/table/ReassignmentsTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package kmql.table;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;

import kmql.Table;

public class ReassignmentsTable implements Table {
@Override
public String name() {
return "reassignments";
}

@Override
public void create(Connection connection) throws Exception {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE TABLE reassignments ("
+ "topic VARCHAR(255) NOT NULL,"
+ "partition INT NOT NULL,"
+ "replica_id INT NOT NULL,"
+ "operation ENUM('adding', 'removing') NOT NULL,"
+ "PRIMARY KEY (topic, partition, replica_id))");
}
}

@Override
public void prepare(Connection connection, AdminClient adminClient) throws Exception {
Map<TopicPartition, PartitionReassignment> reassignments =
adminClient.listPartitionReassignments().reassignments().get();

try (PreparedStatement stmt = connection.prepareStatement(
"INSERT INTO reassignments (topic, partition, replica_id, operation)"
+ " VALUES (?, ?, ?, ?)")) {
for (Entry<TopicPartition, PartitionReassignment> entry : reassignments.entrySet()) {
TopicPartition tp = entry.getKey();
PartitionReassignment reassignment = entry.getValue();
for (Integer replica : reassignment.addingReplicas()) {
insert(stmt, tp, replica, "adding");
}
for (Integer replica : reassignment.removingReplicas()) {
insert(stmt, tp, replica, "removing");
}
}
}
}

private static void insert(PreparedStatement stmt, TopicPartition tp, int replica, String operation)
throws SQLException {
stmt.setString(1, tp.topic());
stmt.setInt(2, tp.partition());
stmt.setInt(3, replica);
stmt.setString(4, operation);
stmt.executeUpdate();
}
}
93 changes: 93 additions & 0 deletions src/test/java/kmql/table/ReassignmentsTableTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package kmql.table;

import static java.util.Collections.emptyList;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

import kmql.SqlUtils;
import lombok.Value;

public class ReassignmentsTableTest {
@Rule
public final MockitoRule rule = MockitoJUnit.rule();

@Mock
private AdminClient adminClient;

private final ReassignmentsTable table = new ReassignmentsTable();

@Value
private static class Row {
String topic;
int partition;
int replica;
String operation;

static Row fromResults(ResultSet results) throws SQLException {
return new Row(
results.getString(1),
results.getInt(2),
results.getInt(3),
results.getString(4));
}
}

@Test
public void prepare() throws Exception {
Map<TopicPartition, PartitionReassignment> reassignments = new HashMap<>();
reassignments.put(new TopicPartition("topic", 1), new PartitionReassignment(
Arrays.asList(1, 2),
Arrays.asList(1),
Arrays.asList(2)));
reassignments.put(new TopicPartition("topic", 2), new PartitionReassignment(
Arrays.asList(3),
emptyList(),
Arrays.asList(3)));
ListPartitionReassignmentsResult result = mock(ListPartitionReassignmentsResult.class);
doReturn(KafkaFuture.completedFuture(reassignments)).when(result).reassignments();
doReturn(result).when(adminClient).listPartitionReassignments();

List<Row> rows = new ArrayList<>();
try (Connection connection = SqlUtils.connection()) {
table.create(connection);
table.prepare(connection, adminClient);

try (Statement stmt = connection.createStatement();
ResultSet results = stmt.executeQuery(
"SELECT * FROM reassignments ORDER BY (topic, partition, replica_id)")) {
while (results.next()) {
Row row = Row.fromResults(results);
rows.add(row);
}
}
}

List<Row> expected = Arrays.asList(
new Row("topic", 1, 1, "adding"),
new Row("topic", 1, 2, "removing"),
new Row("topic", 2, 3, "removing"));
assertEquals(expected, rows);
}
}

0 comments on commit 1147706

Please sign in to comment.