Skip to content

Commit 9fa2fbd

Browse files
committed
Add RandomTwoChoice load balancing policy
The RandomTwoChoice policy is a wrapper load balancing policy that adds the "Power of 2 Choice" algorithm to its child policy. It will compare the first two hosts returned from the child policy query plan, and will first return the host with the target shard having fewer inflight requests. The rest of the child query plan will be left intact. It is intended to be used with TokenAwarePolicy and RoundRobinPolicy, to send queries to the replica nodes by always avoiding the worst option (the replica with the target shard having the most inflight requests will not be chosen).
1 parent 7399a0a commit 9fa2fbd

File tree

3 files changed

+293
-0
lines changed

3 files changed

+293
-0
lines changed

driver-core/src/main/java/com/datastax/driver/core/Metrics.java

+32
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,35 @@ public Integer getValue() {
130130
return value;
131131
}
132132
});
133+
private final Gauge<Map<Host, Map<Integer, Integer>>> perShardInflightRequestInfo =
134+
registry.register(
135+
"per-shard-inflight-request-info",
136+
new Gauge<Map<Host, Map<Integer, Integer>>>() {
137+
@Override
138+
public Map<Host, Map<Integer, Integer>> getValue() {
139+
Map<Host, Map<Integer, Integer>> result = new HashMap<Host, Map<Integer, Integer>>();
140+
for (SessionManager session : manager.sessions) {
141+
for (Map.Entry<Host, HostConnectionPool> poolEntry : session.pools.entrySet()) {
142+
HostConnectionPool hostConnectionPool = poolEntry.getValue();
143+
Map<Integer, Integer> perShardInflightRequests = new HashMap<Integer, Integer>();
144+
145+
for (int shardId = 0;
146+
shardId < hostConnectionPool.connections.length;
147+
shardId++) {
148+
int shardInflightRequests = 0;
149+
for (Connection connection : hostConnectionPool.connections[shardId]) {
150+
shardInflightRequests += connection.inFlight.get();
151+
}
152+
perShardInflightRequests.put(shardId, shardInflightRequests);
153+
}
154+
155+
result.put(poolEntry.getKey(), perShardInflightRequests);
156+
}
157+
}
133158

159+
return result;
160+
}
161+
});
134162
private final Gauge<Integer> executorQueueDepth;
135163
private final Gauge<Integer> blockingExecutorQueueDepth;
136164
private final Gauge<Integer> reconnectionSchedulerQueueSize;
@@ -374,6 +402,10 @@ public Gauge<Map<Host, Integer>> getShardAwarenessInfo() {
374402
return shardAwarenessInfo;
375403
}
376404

405+
public Gauge<Map<Host, Map<Integer, Integer>>> getPerShardInflightRequestInfo() {
406+
return perShardInflightRequestInfo;
407+
}
408+
377409
/**
378410
* Returns the number of bytes sent so far.
379411
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package com.datastax.driver.core.policies;
2+
3+
import com.datastax.driver.core.*;
4+
import com.google.common.collect.AbstractIterator;
5+
import java.nio.ByteBuffer;
6+
import java.util.*;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
/**
10+
* A wrapper load balancing policy that adds "Power of 2 Choice" algorithm to a child policy.
11+
*
12+
* <p>This policy encapsulates another policy. The resulting policy works in the following way:
13+
*
14+
* <ul>
15+
* <li>the {@code distance} method is inherited from the child policy.
16+
* <li>the {@code newQueryPlan} method will compare first two hosts (by number of inflight requests) returned from
17+
* the {@code newQueryPlan} method of the child policy, and the host with fewer number of inflight requests will be
18+
* returned the first. It will allow to always avoid the worst option (comparing by number of inflight requests).
19+
* <li>besides the first two hosts returned by the child policy's {@code newQueryPlan} method, the ordering of the
20+
* rest of the hosts will remain the same.
21+
* </ul>
22+
*
23+
* <p>If you wrap the {@code RandomTwoChoicePolicy} policy with {@code TokenAwarePolicy}, it will compare the first two
24+
* replicas by the number of inflight requests, and the worse option will always be avoided. In that case, it is recommended
25+
* to use the TokenAwarePolicy with {@code ReplicaOrdering.RANDOM strategy}, which will return the replicas in a shuffled
26+
* order and thus will make the "Power of 2 Choice" algorithm more efficient.
27+
*/
28+
public class RandomTwoChoicePolicy implements ChainableLoadBalancingPolicy {
29+
private final LoadBalancingPolicy childPolicy;
30+
private volatile Metrics metrics;
31+
private volatile Metadata clusterMetadata;
32+
private volatile ProtocolVersion protocolVersion;
33+
private volatile CodecRegistry codecRegistry;
34+
35+
/**
36+
* Creates a new {@code RandomTwoChoicePolicy}.
37+
*
38+
* @param childPolicy the load balancing policy to wrap with "Power of 2 Choice" algorithm.
39+
*/
40+
public RandomTwoChoicePolicy(LoadBalancingPolicy childPolicy) {
41+
this.childPolicy = childPolicy;
42+
}
43+
44+
@Override
45+
public LoadBalancingPolicy getChildPolicy() {
46+
return childPolicy;
47+
}
48+
49+
@Override
50+
public void init(Cluster cluster, Collection<Host> hosts) {
51+
this.metrics = cluster.getMetrics();
52+
this.clusterMetadata = cluster.getMetadata();
53+
this.protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
54+
this.codecRegistry = cluster.getConfiguration().getCodecRegistry();
55+
childPolicy.init(cluster, hosts);
56+
}
57+
58+
/**
59+
* {@inheritDoc}
60+
*
61+
* <p>This implementation always returns distances as reported by the wrapped policy.
62+
*/
63+
@Override
64+
public HostDistance distance(Host host) {
65+
return childPolicy.distance(host);
66+
}
67+
68+
/**
69+
* {@inheritDoc}
70+
*
71+
* <p>The returned plan will compare (by the number of inflight requests) the first 2 hosts returned by the child
72+
* policy's {@code newQueryPlan} method, and the host with fewer inflight requests will be returned the first.
73+
* The rest of the child policy's query plan will be left intact.
74+
*/
75+
@Override
76+
public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
77+
String keyspace = statement.getKeyspace();
78+
if (keyspace == null) keyspace = loggedKeyspace;
79+
80+
ByteBuffer routingKey = statement.getRoutingKey(protocolVersion, codecRegistry);
81+
if (routingKey == null || keyspace == null) {
82+
return childPolicy.newQueryPlan(loggedKeyspace, statement);
83+
}
84+
85+
final Token t = clusterMetadata.newToken(statement.getPartitioner(), routingKey);
86+
final Iterator<Host> childIterator = childPolicy.newQueryPlan(keyspace, statement);
87+
88+
final Host host1 = childIterator.hasNext() ? childIterator.next() : null;
89+
final Host host2 = childIterator.hasNext() ? childIterator.next() : null;
90+
91+
final AtomicInteger host1ShardInflightRequests = new AtomicInteger(0);
92+
final AtomicInteger host2ShardInflightRequests = new AtomicInteger(0);
93+
94+
if (host1 != null) {
95+
final int host1ShardId = host1.getShardingInfo().shardId(t);
96+
host1ShardInflightRequests.set(
97+
metrics.getPerShardInflightRequestInfo().getValue().get(host1).get(host1ShardId));
98+
}
99+
100+
if (host2 != null) {
101+
final int host2ShardId = host2.getShardingInfo().shardId(t);
102+
host2ShardInflightRequests.set(
103+
metrics.getPerShardInflightRequestInfo().getValue().get(host2).get(host2ShardId));
104+
}
105+
106+
return new AbstractIterator<Host>() {
107+
private final Host firstChosenHost =
108+
host1ShardInflightRequests.get() < host2ShardInflightRequests.get() ? host1 : host2;
109+
private final Host secondChosenHost =
110+
host1ShardInflightRequests.get() < host2ShardInflightRequests.get() ? host2 : host1;
111+
private int index = 0;
112+
113+
@Override
114+
protected Host computeNext() {
115+
if (index == 0) {
116+
index++;
117+
return firstChosenHost;
118+
} else if (index == 1) {
119+
index++;
120+
return secondChosenHost;
121+
} else if (childIterator.hasNext()) {
122+
return childIterator.next();
123+
}
124+
125+
return endOfData();
126+
}
127+
};
128+
}
129+
130+
@Override
131+
public void onAdd(Host host) {
132+
childPolicy.onAdd(host);
133+
}
134+
135+
@Override
136+
public void onUp(Host host) {
137+
childPolicy.onUp(host);
138+
}
139+
140+
@Override
141+
public void onDown(Host host) {
142+
childPolicy.onDown(host);
143+
}
144+
145+
@Override
146+
public void onRemove(Host host) {
147+
childPolicy.onRemove(host);
148+
}
149+
150+
@Override
151+
public void close() {
152+
childPolicy.close();
153+
}
154+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package com.datastax.driver.core.policies;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.mockito.Mockito.mock;
5+
import static org.mockito.Mockito.when;
6+
7+
import com.codahale.metrics.Gauge;
8+
import com.datastax.driver.core.*;
9+
import java.nio.ByteBuffer;
10+
import java.util.*;
11+
import org.assertj.core.util.Sets;
12+
import org.testng.annotations.BeforeMethod;
13+
import org.testng.annotations.Test;
14+
15+
public class RandomTwoChoicePolicyTest {
16+
private final ByteBuffer routingKey = ByteBuffer.wrap(new byte[] {1, 2, 3, 4});
17+
private final RegularStatement statement =
18+
new SimpleStatement("irrelevant").setRoutingKey(routingKey).setKeyspace("keyspace");
19+
private final Host host1 = mock(Host.class);
20+
private final Host host2 = mock(Host.class);
21+
private final Host host3 = mock(Host.class);
22+
private Cluster cluster;
23+
24+
@SuppressWarnings("unchecked")
25+
private final Gauge<Map<Host, Map<Integer, Integer>>> gauge =
26+
mock((Class<Gauge<Map<Host, Map<Integer, Integer>>>>) (Object) Gauge.class);
27+
28+
@BeforeMethod(groups = "unit")
29+
public void initMocks() {
30+
CodecRegistry codecRegistry = new CodecRegistry();
31+
cluster = mock(Cluster.class);
32+
Configuration configuration = mock(Configuration.class);
33+
ProtocolOptions protocolOptions = mock(ProtocolOptions.class);
34+
Metadata metadata = mock(Metadata.class);
35+
Metrics metrics = mock(Metrics.class);
36+
Token t = mock(Token.class);
37+
ShardingInfo shardingInfo = mock(ShardingInfo.class);
38+
39+
when(metrics.getPerShardInflightRequestInfo()).thenReturn(gauge);
40+
when(cluster.getConfiguration()).thenReturn(configuration);
41+
when(configuration.getCodecRegistry()).thenReturn(codecRegistry);
42+
when(configuration.getProtocolOptions()).thenReturn(protocolOptions);
43+
when(protocolOptions.getProtocolVersion()).thenReturn(ProtocolVersion.DEFAULT);
44+
when(cluster.getMetadata()).thenReturn(metadata);
45+
when(cluster.getMetrics()).thenReturn(metrics);
46+
when(metadata.getReplicas(Metadata.quote("keyspace"), null, routingKey))
47+
.thenReturn(Sets.newLinkedHashSet(host1, host2, host3));
48+
when(metadata.newToken(null, routingKey)).thenReturn(t);
49+
when(host1.getShardingInfo()).thenReturn(shardingInfo);
50+
when(host2.getShardingInfo()).thenReturn(shardingInfo);
51+
when(host3.getShardingInfo()).thenReturn(shardingInfo);
52+
when(shardingInfo.shardId(t)).thenReturn(0);
53+
when(host1.isUp()).thenReturn(true);
54+
when(host2.isUp()).thenReturn(true);
55+
when(host3.isUp()).thenReturn(true);
56+
}
57+
58+
@Test(groups = "unit")
59+
public void should_prefer_host_with_less_inflight_requests() {
60+
// given
61+
Map<Host, Map<Integer, Integer>> perHostInflightRequests =
62+
new HashMap<Host, Map<Integer, Integer>>() {
63+
{
64+
put(
65+
host1,
66+
new HashMap<Integer, Integer>() {
67+
{
68+
put(0, 6);
69+
}
70+
});
71+
put(
72+
host2,
73+
new HashMap<Integer, Integer>() {
74+
{
75+
put(0, 2);
76+
}
77+
});
78+
put(
79+
host3,
80+
new HashMap<Integer, Integer>() {
81+
{
82+
put(0, 4);
83+
}
84+
});
85+
}
86+
};
87+
RandomTwoChoicePolicy policy = new RandomTwoChoicePolicy(new TokenAwarePolicy(new RoundRobinPolicy(), TokenAwarePolicy.ReplicaOrdering.TOPOLOGICAL));
88+
policy.init(
89+
cluster,
90+
new ArrayList<Host>() {
91+
92+
{
93+
add(host1);
94+
add(host2);
95+
add(host3);
96+
}
97+
});
98+
when(gauge.getValue()).thenReturn(perHostInflightRequests);
99+
100+
Iterator<Host> queryPlan = policy.newQueryPlan("keyspace", statement);
101+
// host2 should appear first in the query plan with fewer inflight requests than host1
102+
103+
assertThat(queryPlan.next()).isEqualTo(host2);
104+
assertThat(queryPlan.next()).isEqualTo(host1);
105+
assertThat(queryPlan.next()).isEqualTo(host3);
106+
}
107+
}

0 commit comments

Comments
 (0)