Skip to content

Commit a67569c

Browse files
committed
Add logdirs table
1 parent 52e993e commit a67569c

File tree

4 files changed

+94
-0
lines changed

4 files changed

+94
-0
lines changed

src/main/java/kmql/Database.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.sql.ResultSet;
66
import java.sql.SQLException;
77
import java.sql.Statement;
8+
import java.util.Collection;
89
import java.util.HashMap;
910
import java.util.Map;
1011
import java.util.Map.Entry;
@@ -51,6 +52,10 @@ public void prepareTable(String name, AdminClient adminClient) throws Exception
5152
if (meta.initialized) {
5253
return;
5354
}
55+
Collection<String> dependencyTables = meta.table.dependencyTables();
56+
for (String dependencyTable : dependencyTables) {
57+
prepareTable(dependencyTable, adminClient);
58+
}
5459
meta.table.prepare(connection, adminClient);
5560
meta.initialized = true;
5661
}

src/main/java/kmql/Table.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
package kmql;
22

3+
import static java.util.Collections.emptyList;
4+
35
import java.sql.Connection;
6+
import java.util.Collection;
47

58
import org.apache.kafka.clients.admin.AdminClient;
69

710
public interface Table {
811
String name();
912

13+
default Collection<String> dependencyTables() {
14+
return emptyList();
15+
}
16+
1017
void prepare(Connection connection, AdminClient adminClient) throws Exception;
1118
}

src/main/java/kmql/TableRegistry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.concurrent.ConcurrentMap;
99

1010
import kmql.table.BrokersTable;
11+
import kmql.table.LogdirsTable;
1112
import kmql.table.ReplicasTable;
1213

1314
public class TableRegistry implements Iterable<Map.Entry<String, Table>> {
@@ -16,6 +17,7 @@ public class TableRegistry implements Iterable<Map.Entry<String, Table>> {
1617
static {
1718
registerDefault(new ReplicasTable());
1819
registerDefault(new BrokersTable());
20+
registerDefault(new LogdirsTable());
1921
}
2022

2123
private final ConcurrentMap<String, Table> tables;
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package kmql.table;
2+
3+
import static java.util.Collections.singletonList;
4+
5+
import java.sql.Connection;
6+
import java.sql.PreparedStatement;
7+
import java.sql.ResultSet;
8+
import java.sql.Statement;
9+
import java.util.ArrayList;
10+
import java.util.Collection;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Map.Entry;
14+
15+
import org.apache.kafka.clients.admin.AdminClient;
16+
import org.apache.kafka.common.TopicPartition;
17+
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;
18+
import org.apache.kafka.common.requests.DescribeLogDirsResponse.ReplicaInfo;
19+
20+
import kmql.Table;
21+
22+
public class LogdirsTable implements Table {
23+
@Override
24+
public String name() {
25+
return "logdirs";
26+
}
27+
28+
@Override
29+
public Collection<String> dependencyTables() {
30+
return singletonList("brokers");
31+
}
32+
33+
@Override
34+
public void prepare(Connection connection, AdminClient adminClient) throws Exception {
35+
try (Statement stmt = connection.createStatement()) {
36+
stmt.execute("CREATE TABLE logdirs ("
37+
+ "broker_id INT NOT NULL,"
38+
+ "path VARCHAR(255) NOT NULL,"
39+
+ "topic VARCHAR(255) NOT NULL,"
40+
+ "partition INT NOT NULL,"
41+
+ "size BIGINT NOT NULL,"
42+
+ "offset_lag BIGINT NOT NULL,"
43+
+ "is_future BOOLEAN NOT NULL,"
44+
+ "PRIMARY KEY (broker_id, path, topic, partition))");
45+
}
46+
47+
List<Integer> brokerIds = new ArrayList<>();
48+
try (Statement stmt = connection.createStatement();
49+
ResultSet results = stmt.executeQuery("SELECT id FROM brokers")) {
50+
while (results.next()) {
51+
int brokerId = results.getInt(1);
52+
brokerIds.add(brokerId);
53+
}
54+
}
55+
56+
Map<Integer, Map<String, LogDirInfo>> logDirs = adminClient.describeLogDirs(brokerIds).all().get();
57+
try (PreparedStatement stmt = connection.prepareStatement(
58+
"INSERT INTO logdirs (broker_id, path, topic, partition, size, offset_lag, is_future) VALUES (?, ?, ?, ?, ?, ?, ?)")) {
59+
for (Entry<Integer, Map<String, LogDirInfo>> entry : logDirs.entrySet()) {
60+
int brokerId = entry.getKey();
61+
for (Entry<String, LogDirInfo> dirEntry : entry.getValue().entrySet()) {
62+
String path = dirEntry.getKey();
63+
LogDirInfo info = dirEntry.getValue();
64+
for (Entry<TopicPartition, ReplicaInfo> replicaEntry : info.replicaInfos.entrySet()) {
65+
TopicPartition tp = replicaEntry.getKey();
66+
ReplicaInfo replicaInfo = replicaEntry.getValue();
67+
stmt.setInt(1, brokerId);
68+
stmt.setString(2, path);
69+
stmt.setString(3, tp.topic());
70+
stmt.setInt(4, tp.partition());
71+
stmt.setLong(5, replicaInfo.size);
72+
stmt.setLong(6, replicaInfo.offsetLag);
73+
stmt.setBoolean(7, replicaInfo.isFuture);
74+
stmt.executeUpdate();
75+
}
76+
}
77+
}
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)