Skip to content

Commit cd00c97

Browse files
authored
Make schema agreement query pull only columns that are used (#470)
Currently schema agreement logic run `select * from system.peers`, while only `schema_version` is used, it creates excessive load on cluster and driver side.
1 parent 8cec131 commit cd00c97

File tree

2 files changed

+48
-15
lines changed

2 files changed

+48
-15
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementChecker.java

+27-8
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,11 @@ private void sendQueries() {
108108
} else {
109109
CompletionStage<AdminResult> localQuery =
110110
query("SELECT schema_version FROM system.local WHERE key='local'");
111-
CompletionStage<AdminResult> peersQuery = query("SELECT * FROM system.peers");
111+
112+
// `tokens` column is excluded, it is served from
113+
// context.getMetadataManager().getMetadata().getTokenMap()
114+
CompletionStage<AdminResult> peersQuery =
115+
query("SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers");
112116

113117
localQuery
114118
.thenCombine(peersQuery, this::extractSchemaVersions)
@@ -142,9 +146,15 @@ private Set<UUID> extractSchemaVersions(AdminResult controlNodeResult, AdminResu
142146
channel.getEndPoint());
143147
}
144148

149+
boolean allowZeroTokenNodes =
150+
context
151+
.getConfig()
152+
.getDefaultProfile()
153+
.getBoolean(DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS);
154+
145155
Map<UUID, Node> nodes = context.getMetadataManager().getMetadata().getNodes();
146156
for (AdminRow peerRow : peersResult) {
147-
if (isPeerValid(peerRow, nodes)) {
157+
if (isPeerValid(peerRow, nodes, allowZeroTokenNodes)) {
148158
UUID schemaVersion = Objects.requireNonNull(peerRow.getUuid("schema_version"));
149159
schemaVersions.add(schemaVersion);
150160
}
@@ -189,13 +199,13 @@ protected CompletionStage<AdminResult> query(String queryString) {
189199
.start();
190200
}
191201

192-
protected boolean isPeerValid(AdminRow peerRow, Map<UUID, Node> nodes) {
202+
protected boolean isPeerValid(
203+
AdminRow peerRow, Map<UUID, Node> nodes, boolean allowZeroTokenNodes) {
193204
if (PeerRowValidator.isValid(
194205
peerRow,
195-
context
196-
.getConfig()
197-
.getDefaultProfile()
198-
.getBoolean(DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS))) {
206+
// allowZeroTokenPeers is true since `tokens` column is not pulled, but it will make it
207+
// ignore `tokens` column.
208+
true)) {
199209
UUID hostId = peerRow.getUuid("host_id");
200210
Node node = nodes.get(hostId);
201211
if (node == null) {
@@ -205,7 +215,16 @@ protected boolean isPeerValid(AdminRow peerRow, Map<UUID, Node> nodes) {
205215
LOG.debug("[{}] Peer {} is down, excluding from schema agreement check", logPrefix, hostId);
206216
return false;
207217
}
208-
return true;
218+
219+
if (allowZeroTokenNodes) {
220+
return true;
221+
}
222+
223+
if (!(node instanceof DefaultNode)) {
224+
return true;
225+
}
226+
227+
return !((DefaultNode) node).getRawTokens().isEmpty();
209228
} else {
210229
LOG.warn(
211230
"[{}] Found invalid system.peers row for peer: {}, excluding from schema agreement check.",

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementCheckerTest.java

+21-7
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ public void should_succeed_if_only_one_node() {
139139
new StubbedQuery(
140140
"SELECT schema_version FROM system.local WHERE key='local'",
141141
mockResult(mockLocalRow(VERSION1))),
142-
new StubbedQuery("SELECT * FROM system.peers", mockResult(/*empty*/ )));
142+
new StubbedQuery(
143+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
144+
mockResult(/*empty*/ )));
143145

144146
// When
145147
CompletionStage<Boolean> future = checker.run();
@@ -156,7 +158,9 @@ public void should_succeed_if_versions_match_on_first_try() {
156158
new StubbedQuery(
157159
"SELECT schema_version FROM system.local WHERE key='local'",
158160
mockResult(mockLocalRow(VERSION1))),
159-
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION1))));
161+
new StubbedQuery(
162+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
163+
mockResult(mockValidPeerRow(VERSION1))));
160164

161165
// When
162166
CompletionStage<Boolean> future = checker.run();
@@ -174,7 +178,9 @@ public void should_ignore_down_peers() {
174178
new StubbedQuery(
175179
"SELECT schema_version FROM system.local WHERE key='local'",
176180
mockResult(mockLocalRow(VERSION1))),
177-
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION2))));
181+
new StubbedQuery(
182+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
183+
mockResult(mockValidPeerRow(VERSION2))));
178184

179185
// When
180186
CompletionStage<Boolean> future = checker.run();
@@ -210,7 +216,9 @@ public void should_ignore_malformed_rows(AdminRow malformedPeer) {
210216
new StubbedQuery(
211217
"SELECT schema_version FROM system.local WHERE key='local'",
212218
mockResult(mockLocalRow(VERSION1))),
213-
new StubbedQuery("SELECT * FROM system.peers", mockResult(malformedPeer)));
219+
new StubbedQuery(
220+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
221+
mockResult(malformedPeer)));
214222

215223
// When
216224
CompletionStage<Boolean> future = checker.run();
@@ -228,13 +236,17 @@ public void should_reschedule_if_versions_do_not_match_on_first_try() {
228236
new StubbedQuery(
229237
"SELECT schema_version FROM system.local WHERE key='local'",
230238
mockResult(mockLocalRow(VERSION1))),
231-
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION2))),
239+
new StubbedQuery(
240+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
241+
mockResult(mockValidPeerRow(VERSION2))),
232242

233243
// Second round
234244
new StubbedQuery(
235245
"SELECT schema_version FROM system.local WHERE key='local'",
236246
mockResult(mockLocalRow(VERSION1))),
237-
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION1))));
247+
new StubbedQuery(
248+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
249+
mockResult(mockValidPeerRow(VERSION1))));
238250

239251
// When
240252
CompletionStage<Boolean> future = checker.run();
@@ -253,7 +265,9 @@ public void should_fail_if_versions_do_not_match_after_timeout() {
253265
new StubbedQuery(
254266
"SELECT schema_version FROM system.local WHERE key='local'",
255267
mockResult(mockLocalRow(VERSION1))),
256-
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION1))));
268+
new StubbedQuery(
269+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
270+
mockResult(mockValidPeerRow(VERSION1))));
257271

258272
// When
259273
CompletionStage<Boolean> future = checker.run();

0 commit comments

Comments
 (0)