Skip to content

Commit 64b3568

Browse files
committed
Expose a decorator for CqlPrepareAsyncProcessor cache rather than the ability to specify an
arbitrary cache from scratch. Also bringing tests from #2003 forward with a few minor changes due to this implementation patch by Bret McGuire; reviewed by Bret McGuire and Andy Tolbert reference: #2008
1 parent eac7b24 commit 64b3568

File tree

3 files changed

+178
-12
lines changed

3 files changed

+178
-12
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@
3838
import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder;
3939
import com.datastax.oss.driver.shaded.guava.common.collect.Iterables;
4040
import com.datastax.oss.protocol.internal.ProtocolConstants;
41+
import com.google.common.base.Functions;
4142
import edu.umd.cs.findbugs.annotations.NonNull;
4243
import io.netty.util.concurrent.EventExecutor;
4344
import java.util.Map;
4445
import java.util.Optional;
4546
import java.util.concurrent.CompletableFuture;
4647
import java.util.concurrent.CompletionStage;
4748
import java.util.concurrent.ExecutionException;
49+
import java.util.function.Function;
4850
import net.jcip.annotations.ThreadSafe;
4951
import org.slf4j.Logger;
5052
import org.slf4j.LoggerFactory;
@@ -62,14 +64,15 @@ public CqlPrepareAsyncProcessor() {
6264
}
6365

6466
public CqlPrepareAsyncProcessor(@NonNull Optional<? extends DefaultDriverContext> context) {
65-
this(CacheBuilder.newBuilder().weakValues().build(), context);
67+
this(context, Functions.identity());
6668
}
6769

6870
protected CqlPrepareAsyncProcessor(
69-
Cache<PrepareRequest, CompletableFuture<PreparedStatement>> cache,
70-
Optional<? extends DefaultDriverContext> context) {
71+
Optional<? extends DefaultDriverContext> context,
72+
Function<CacheBuilder<Object, Object>, CacheBuilder<Object, Object>> decorator) {
7173

72-
this.cache = cache;
74+
CacheBuilder<Object, Object> baseCache = CacheBuilder.newBuilder().weakValues();
75+
this.cache = decorator.apply(baseCache).build();
7376
context.ifPresent(
7477
(ctx) -> {
7578
LOG.info("Adding handler to invalidate cached prepared statements on type changes");

integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java

+5-8
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2525
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
2626
import com.datastax.oss.driver.api.core.context.DriverContext;
27-
import com.datastax.oss.driver.api.core.cql.PrepareRequest;
2827
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
2928
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
3029
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
@@ -41,7 +40,6 @@
4140
import com.datastax.oss.driver.internal.core.session.BuiltInRequestProcessors;
4241
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
4342
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
44-
import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder;
4543
import com.datastax.oss.driver.shaded.guava.common.cache.RemovalListener;
4644
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles;
4745
import com.google.common.collect.ImmutableList;
@@ -119,11 +117,12 @@ private static class TestCqlPrepareAsyncProcessor extends CqlPrepareAsyncProcess
119117
private static final Logger LOG =
120118
LoggerFactory.getLogger(PreparedStatementCachingIT.TestCqlPrepareAsyncProcessor.class);
121119

122-
private static RemovalListener<PrepareRequest, CompletableFuture<PreparedStatement>>
123-
buildCacheRemoveCallback(@NonNull Optional<DefaultDriverContext> context) {
120+
private static RemovalListener<Object, Object> buildCacheRemoveCallback(
121+
@NonNull Optional<DefaultDriverContext> context) {
124122
return (evt) -> {
125123
try {
126-
CompletableFuture<PreparedStatement> future = evt.getValue();
124+
CompletableFuture<PreparedStatement> future =
125+
(CompletableFuture<PreparedStatement>) evt.getValue();
127126
ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId();
128127
context.ifPresent(
129128
ctx -> ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId)));
@@ -136,9 +135,7 @@ private static class TestCqlPrepareAsyncProcessor extends CqlPrepareAsyncProcess
136135
public TestCqlPrepareAsyncProcessor(@NonNull Optional<DefaultDriverContext> context) {
137136
// Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so
138137
// to prevent cache entries from unexpectedly disappearing mid-test.
139-
super(
140-
CacheBuilder.newBuilder().removalListener(buildCacheRemoveCallback(context)).build(),
141-
context);
138+
super(context, builder -> builder.removalListener(buildCacheRemoveCallback(context)));
142139
}
143140
}
144141

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.oss.driver.core.cql;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.junit.Assert.fail;
22+
23+
import com.datastax.oss.driver.api.core.CqlSession;
24+
import com.datastax.oss.driver.api.core.cql.PrepareRequest;
25+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
26+
import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule;
27+
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
28+
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
29+
import com.datastax.oss.driver.categories.IsolatedTests;
30+
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
31+
import com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor;
32+
import com.datastax.oss.driver.shaded.guava.common.base.Predicates;
33+
import com.datastax.oss.driver.shaded.guava.common.cache.Cache;
34+
import com.datastax.oss.driver.shaded.guava.common.collect.Iterables;
35+
import java.util.concurrent.CompletableFuture;
36+
import org.junit.After;
37+
import org.junit.Before;
38+
import org.junit.Rule;
39+
import org.junit.Test;
40+
import org.junit.experimental.categories.Category;
41+
import org.junit.rules.RuleChain;
42+
import org.junit.rules.TestRule;
43+
44+
@Category(IsolatedTests.class)
45+
public class PreparedStatementCancellationIT {
46+
47+
private CustomCcmRule ccmRule = CustomCcmRule.builder().build();
48+
49+
private SessionRule<CqlSession> sessionRule = SessionRule.builder(ccmRule).build();
50+
51+
@Rule public TestRule chain = RuleChain.outerRule(ccmRule).around(sessionRule);
52+
53+
@Before
54+
public void setup() {
55+
56+
CqlSession session = SessionUtils.newSession(ccmRule, sessionRule.keyspace());
57+
session.execute("DROP TABLE IF EXISTS test_table_1");
58+
session.execute("CREATE TABLE test_table_1 (k int primary key, v int)");
59+
session.execute("INSERT INTO test_table_1 (k,v) VALUES (1, 100)");
60+
session.execute("INSERT INTO test_table_1 (k,v) VALUES (2, 200)");
61+
session.execute("INSERT INTO test_table_1 (k,v) VALUES (3, 300)");
62+
session.close();
63+
}
64+
65+
@After
66+
public void teardown() {
67+
68+
CqlSession session = SessionUtils.newSession(ccmRule, sessionRule.keyspace());
69+
session.execute("DROP TABLE test_table_1");
70+
session.close();
71+
}
72+
73+
private CompletableFuture<PreparedStatement> toCompletableFuture(CqlSession session, String cql) {
74+
75+
return session.prepareAsync(cql).toCompletableFuture();
76+
}
77+
78+
private CqlPrepareAsyncProcessor findProcessor(CqlSession session) {
79+
80+
DefaultDriverContext context = (DefaultDriverContext) session.getContext();
81+
return (CqlPrepareAsyncProcessor)
82+
Iterables.find(
83+
context.getRequestProcessorRegistry().getProcessors(),
84+
Predicates.instanceOf(CqlPrepareAsyncProcessor.class));
85+
}
86+
87+
@Test
88+
public void should_cache_valid_cql() throws Exception {
89+
90+
CqlSession session = SessionUtils.newSession(ccmRule, sessionRule.keyspace());
91+
CqlPrepareAsyncProcessor processor = findProcessor(session);
92+
Cache<PrepareRequest, CompletableFuture<PreparedStatement>> cache = processor.getCache();
93+
assertThat(cache.size()).isEqualTo(0);
94+
95+
// Make multiple CompletableFuture requests for the specified CQL, then wait until
96+
// the cached request finishes and confirm that all futures got the same values
97+
String cql = "select v from test_table_1 where k = ?";
98+
CompletableFuture<PreparedStatement> cf1 = toCompletableFuture(session, cql);
99+
CompletableFuture<PreparedStatement> cf2 = toCompletableFuture(session, cql);
100+
assertThat(cache.size()).isEqualTo(1);
101+
102+
CompletableFuture<PreparedStatement> future = Iterables.get(cache.asMap().values(), 0);
103+
PreparedStatement stmt = future.get();
104+
105+
assertThat(cf1.isDone()).isTrue();
106+
assertThat(cf2.isDone()).isTrue();
107+
108+
assertThat(cf1.join()).isEqualTo(stmt);
109+
assertThat(cf2.join()).isEqualTo(stmt);
110+
}
111+
112+
// A holdover from work done on JAVA-3055. This probably isn't _desired_ behaviour but this test
113+
// documents the fact that the current driver impl will behave in this way. We should probably
114+
// consider changing this in a future release, although it's worthwhile fully considering the
115+
// implications of such a change.
116+
@Test
117+
public void will_cache_invalid_cql() throws Exception {
118+
119+
CqlSession session = SessionUtils.newSession(ccmRule, sessionRule.keyspace());
120+
CqlPrepareAsyncProcessor processor = findProcessor(session);
121+
Cache<PrepareRequest, CompletableFuture<PreparedStatement>> cache = processor.getCache();
122+
assertThat(cache.size()).isEqualTo(0);
123+
124+
// Verify that we get the CompletableFuture even if the CQL is invalid but that nothing is
125+
// cached
126+
String cql = "select v fromfrom test_table_1 where k = ?";
127+
CompletableFuture<PreparedStatement> cf = toCompletableFuture(session, cql);
128+
129+
// join() here should throw exceptions due to the invalid syntax... for purposes of this test we
130+
// can ignore this
131+
try {
132+
cf.join();
133+
fail();
134+
} catch (Exception e) {
135+
}
136+
137+
assertThat(cache.size()).isEqualTo(1);
138+
}
139+
140+
@Test
141+
public void should_not_affect_cache_if_returned_futures_are_cancelled() throws Exception {
142+
143+
CqlSession session = SessionUtils.newSession(ccmRule, sessionRule.keyspace());
144+
CqlPrepareAsyncProcessor processor = findProcessor(session);
145+
Cache<PrepareRequest, CompletableFuture<PreparedStatement>> cache = processor.getCache();
146+
assertThat(cache.size()).isEqualTo(0);
147+
148+
String cql = "select v from test_table_1 where k = ?";
149+
CompletableFuture<PreparedStatement> cf = toCompletableFuture(session, cql);
150+
151+
assertThat(cf.isCancelled()).isFalse();
152+
assertThat(cf.cancel(false)).isTrue();
153+
assertThat(cf.isCancelled()).isTrue();
154+
assertThat(cf.isCompletedExceptionally()).isTrue();
155+
156+
// Confirm that cancelling the CompletableFuture returned to the user does _not_ cancel the
157+
// future used within the cache. CacheEntry very deliberately doesn't maintain a reference
158+
// to it's contained CompletableFuture so we have to get at this by secondary effects.
159+
assertThat(cache.size()).isEqualTo(1);
160+
CompletableFuture<PreparedStatement> future = Iterables.get(cache.asMap().values(), 0);
161+
PreparedStatement rv = future.get();
162+
assertThat(rv).isNotNull();
163+
assertThat(rv.getQuery()).isEqualTo(cql);
164+
assertThat(cache.size()).isEqualTo(1);
165+
}
166+
}

0 commit comments

Comments
 (0)