Skip to content

Commit 1d58dce

Browse files
committed
Bug 37315346 - [37264961->24.09.1] CQC constructor with fCacheValues of false should configure lite mapListener (ce-main-> ce-24.09)
Remote remote.full on coherence-ce/release/coherence-ce-v24.09 success, changes 112626, synced @112626, job.9.20241127123943.7673 [git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v24.09/": change = 112634]
1 parent 107ef5b commit 1d58dce

File tree

3 files changed

+210
-16
lines changed

3 files changed

+210
-16
lines changed

prj/coherence-core/src/main/java/com/tangosol/net/cache/ContinuousQueryCache.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public ContinuousQueryCache(NamedCache<K, V_BACK> cache, Filter filter,
221221
* @param filter the {@link Filter} that defines the view
222222
* @param fCacheValues pass {@code true} to cache both the keys and values of the
223223
* materialized view locally, or {@code false} to only cache
224-
* the keys
224+
* the keys. Override of {@code false} described in {@link #isCacheValues()}.
225225
*/
226226
public ContinuousQueryCache(NamedCache<K, V_BACK> cache, Filter filter, boolean fCacheValues)
227227
{
@@ -247,7 +247,7 @@ public ContinuousQueryCache(NamedCache<K, V_BACK> cache, Filter filter, boolean
247247
public ContinuousQueryCache(NamedCache<K, V_BACK> cache, Filter filter,
248248
MapListener<? super K, ? super V_FRONT> listener)
249249
{
250-
this(cache, filter, false, listener, null);
250+
this(cache, filter, true, listener, null);
251251
}
252252

253253
/**
@@ -284,15 +284,19 @@ public ContinuousQueryCache(NamedCache<K, V_BACK> cache, Filter filter,
284284
*
285285
* @param cache the {@link NamedCache} to create a view of
286286
* @param filter the {@link Filter} that defines the view
287-
* @param fCacheValues pass true to cache both the keys and values of the
288-
* materialized view locally, or false to only cache
289-
* the keys
287+
* @param fCacheValues pass {@code true} to cache both the keys and values of the
288+
* materialized view locally, or {@code false} to only cache
289+
* the keys. Override of {@code false} described in {@link #isCacheValues()}
290290
* @param listener an optional {@link MapListener} that will receive all
291291
* events starting from the initialization of the {@code ContinuousQueryCache}
292292
* @param transformer an optional {@link ValueExtractor} that would be used to
293293
* transform values retrieved from the underlying cache
294294
* before storing them locally; if specified, this
295295
* {@code ContinuousQueryCache} will become "read-only"
296+
* <p>
297+
* Note: When parameter {@code fCacheValues} is {@code false}, it is inferred that provided parameter
298+
* {@code listener} is a lite listener as described by {@code fLite} parameter of
299+
* {@link #addMapListener(MapListener, Filter, boolean)}.
296300
*/
297301
public ContinuousQueryCache(NamedCache<K, V_BACK> cache, Filter filter,
298302
boolean fCacheValues, MapListener<? super K, ? super V_FRONT> listener,
@@ -321,7 +325,7 @@ public ContinuousQueryCache(NamedCache<K, V_BACK> cache, Filter filter,
321325
* @param filter the {@link Filter} that defines the view
322326
* @param fCacheValues pass {@code true} to cache both the keys and values of the
323327
* materialized view locally, or {@code false} to only cache
324-
* the keys
328+
* the keys. Override of {@code false} described in {@link #isCacheValues()}
325329
* @param listener an optional {@link MapListener} that will receive all
326330
* events starting from the initialization of the
327331
* {@code ContinuousQueryCache}
@@ -330,6 +334,10 @@ public ContinuousQueryCache(NamedCache<K, V_BACK> cache, Filter filter,
330334
* before storing them locally; if specified, this
331335
* {@code ContinuousQueryCache} will become <em>read-only</em>
332336
* @param loader an optional {@link ClassLoader}
337+
* * <p>
338+
* Note: When parameter {@code fCacheValues} is {@code false}, it is inferred that the provided parameter
339+
* {@code listener} is a lite listener as described by {@code fLite} parameter of
340+
* {@link #addMapListener(MapListener, Filter, boolean)}.
333341
*
334342
* @since 12.2.1.4
335343
*/
@@ -365,6 +373,8 @@ public ContinuousQueryCache(Supplier<NamedCache<K, V_BACK>> supplierCache, Filte
365373
m_fReadOnly = transformer != null;
366374
m_nState = STATE_DISCONNECTED;
367375

376+
// initialize Observable listener on whether a standard (non-lite) listener passed at construction time
377+
m_fListeners = listener != null && fCacheValues;
368378
if (listener instanceof MapTriggerListener)
369379
{
370380
throw new IllegalArgumentException("ContinuousQueryCache does not support MapTriggerListeners");
@@ -458,13 +468,20 @@ public Filter getFilter()
458468

459469
/**
460470
* Determine if this {@code ContinuousQueryCache} caches values locally.
471+
* <p>
472+
* Note: if {@link #addMapListener(MapListener, Filter, boolean) addMapListener} adds
473+
* a standard (non-lite) listener or a filter to this {@link ObservableMap},
474+
* cache values are always maintained locally. The locally cached values are
475+
* used to filter events and to supply the {@link MapEvent#getOldValue() old}
476+
* and {@link MapEvent#getNewValue() new} values for the events that it raises.
477+
* Additionally, a non-null {@link #getTransformer() transformer} infers caches values being stored locally.
461478
*
462479
* @return {@code true} if this object caches values locally, and {@code false} if it
463480
* relies on the underlying {@link NamedCache}
464481
*/
465482
public boolean isCacheValues()
466483
{
467-
return m_fCacheValues || isObserved();
484+
return m_fCacheValues || isObserved() || getTransformer() != null;
468485
}
469486

470487
/**
@@ -478,7 +495,7 @@ public boolean isCacheValues()
478495
* <p>
479496
*
480497
* @param fCacheValues pass {@code true} to enable local caching, or {@code false}
481-
* to disable it
498+
* to disable it. Override of {@code false} described in {@link #isCacheValues()}.
482499
*/
483500
public synchronized void setCacheValues(boolean fCacheValues)
484501
{
@@ -572,13 +589,13 @@ protected ObservableMap<K, V_FRONT> ensureInternalCache()
572589
m_mapLocal = instantiateInternalCache();
573590

574591
MapListener mapListener = m_mapListener;
592+
boolean fLite = !isCacheValues();
575593
if (mapListener != null)
576594
{
577595
// the initial listener has to hear the initial events
578596
ensureEventQueue();
579597
ensureListenerSupport().addListener(
580-
instantiateEventRouter(mapListener, false), (Filter) null, false);
581-
m_fListeners = true;
598+
instantiateEventRouter(mapListener, fLite), (Filter) null, fLite);
582599
}
583600
}
584601
return m_mapLocal;
@@ -1678,8 +1695,8 @@ protected synchronized void configureSynchronization(boolean fReload)
16781695
changeState(STATE_CONFIGURING);
16791696
m_ldtConnectionTimestamp = getSafeTimeMillis();
16801697

1681-
NamedCache cache = getCache();
1682-
Filter filter = getFilter();
1698+
NamedCache cache = getCache();
1699+
Filter filter = getFilter();
16831700
boolean fCacheValues = isCacheValues();
16841701

16851702
// get the old filters and listeners

prj/test/functional/cache/src/main/java/cache/CQCTests.java

Lines changed: 137 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
@@ -29,10 +29,13 @@
2929
import com.tangosol.util.InvocableMap;
3030
import com.tangosol.util.MapEvent;
3131
import com.tangosol.util.MapListener;
32+
import com.tangosol.util.ObservableMap;
3233

3334
import com.tangosol.util.extractor.IdentityExtractor;
35+
import com.tangosol.util.extractor.KeyExtractor;
3436

3537
import com.tangosol.util.filter.AlwaysFilter;
38+
import com.tangosol.util.filter.BetweenFilter;
3639

3740
import com.tangosol.util.processor.AbstractProcessor;
3841

@@ -68,8 +71,10 @@
6871
import static org.hamcrest.CoreMatchers.nullValue;
6972

7073
import static org.junit.Assert.assertEquals;
74+
import static org.junit.Assert.assertFalse;
7175
import static org.junit.Assert.assertNull;
7276
import static org.junit.Assert.assertThat;
77+
import static org.junit.Assert.assertTrue;
7378
import static org.junit.Assert.fail;
7479

7580
/**
@@ -182,6 +187,98 @@ public void testEvents()
182187
Eventually.assertThat(invoking(listener).getActualTotal(), is(SOME_DATA));
183188
}
184189

190+
@Test
191+
public void testNoCacheValuesToObservable()
192+
{
193+
NamedCache<String, Integer> testCache = getAndPopulateNamedCache("dist-test");
194+
TestCQCListener listener = new ValidateLiteListener(SOME_DATA);
195+
ContinuousQueryCache<String, Integer, Integer> theCQC = setCQC(new ContinuousQueryCache<>(
196+
testCache,
197+
AlwaysFilter.INSTANCE,
198+
/*fCacheValues*/ false,
199+
listener,
200+
null));
201+
assertFalse(theCQC.isCacheValues());
202+
203+
TestCQCListener listener1 = new TestCQCListener(SOME_DATA);
204+
theCQC.addMapListener(listener1, (Filter)null, /*fLite*/ false);
205+
assertTrue(theCQC.isCacheValues());
206+
testCache.put("moreData", 4);
207+
theCQC.release();
208+
}
209+
210+
@Test
211+
public void testEventsNoValues()
212+
{
213+
// put data items into inner cache to generate events
214+
NamedCache<String, Integer> testCache = getAndPopulateNamedCache("dist-test");
215+
TestCQCListener listener = new ValidateLiteListener(SOME_DATA);
216+
ContinuousQueryCache<String, Integer, Integer> theCQC = setCQC(new ContinuousQueryCache<>(
217+
testCache,
218+
AlwaysFilter.INSTANCE,
219+
/*fCacheValues*/ false,
220+
listener,
221+
null));
222+
assertFalse(theCQC.isCacheValues());
223+
224+
225+
// check that the listener received the correct number of events
226+
// and the CQC has set its state to STATE_SYNCHRONIZED
227+
Eventually.assertThat(invoking(theCQC).getState(), is(ContinuousQueryCache.STATE_SYNCHRONIZED));
228+
Eventually.assertThat(invoking(listener).getActualTotal(), is(SOME_DATA));
229+
assertFalse(theCQC.isCacheValues());
230+
231+
// suspend service
232+
getFactory().ensureService("CQCTestService").shutdown();
233+
234+
// verify CQC has received disconnect event before restarting service
235+
Eventually.assertThat(invoking(theCQC).getState(), is(ContinuousQueryCache.STATE_DISCONNECTED));
236+
237+
// reset the listener
238+
listener.resetActualTotal();
239+
240+
// restart the service
241+
getFactory().ensureService("CQCTestService").start();
242+
243+
// ping the CQC for non-existing key to make it realize the cache needs restart
244+
theCQC.get("junkKey");
245+
246+
// check that the listener received the correct number of events after restart
247+
Eventually.assertThat(invoking(theCQC).getState(), is(ContinuousQueryCache.STATE_SYNCHRONIZED));
248+
Eventually.assertThat(invoking(listener).getActualTotal(), is(SOME_DATA));
249+
}
250+
251+
@Test
252+
public void testEventsNoValuesKeyFilter()
253+
{
254+
NamedCache<String, Integer> testCache = getAndPopulateNamedCache("dist-test");
255+
TestCQCListener listener = new ValidateLiteListener(SOME_DATA/2);
256+
ContinuousQueryCache<String, Integer, Integer> theCQCFirstHalf = setCQC(new ContinuousQueryCache<>(
257+
testCache,
258+
new BetweenFilter(new KeyExtractor(), "TestKey00", "TestKey49"),
259+
/*fCacheValues*/ false,
260+
listener,
261+
null));
262+
assertFalse(theCQCFirstHalf.isCacheValues());
263+
264+
TestCQCListener listener2 = new ValidateLiteListener(SOME_DATA/2);
265+
ContinuousQueryCache<String, Integer, Integer> theCQCSecondHalf = setCQC(new ContinuousQueryCache<>(
266+
testCache,
267+
new BetweenFilter(new KeyExtractor(), "TestKey50", "TestKey99"),
268+
/*fCacheValues*/ false,
269+
listener2,
270+
null));
271+
assertFalse(theCQCSecondHalf.isCacheValues());
272+
273+
int i = theCQCFirstHalf.get("TestKey09");
274+
275+
assertTrue(theCQCFirstHalf.size() <= 50);
276+
assertTrue(theCQCSecondHalf.size() <= 50);
277+
278+
Eventually.assertThat(invoking(listener).getActualTotal(), is(SOME_DATA/2));
279+
Eventually.assertThat(invoking(listener2).getActualTotal(), is(SOME_DATA/2));
280+
}
281+
185282
/**
186283
* Test the case where an Invoke inserts an entry into the cache
187284
* to ensure that the event contains the value being inserted.
@@ -736,7 +833,7 @@ protected NamedCache<String, Integer> getAndPopulateNamedCache(String sCacheName
736833
testCache.clear();
737834
for (int i = 0; i < SOME_DATA; i++)
738835
{
739-
testCache.put("TestKey" + i, i);
836+
testCache.put("TestKey" + String.format("%02d", i), i);
740837
}
741838
return testCache;
742839
}
@@ -961,6 +1058,44 @@ public void writeExternal(PofWriter out)
9611058
private String m_sValue;
9621059
}
9631060

1061+
// ----- inner class: ValidateLiteListener -------------------------------
1062+
1063+
/**
1064+
* MapListener that verifies the data received in the event.
1065+
*/
1066+
@SuppressWarnings("unused")
1067+
public class ValidateLiteListener extends TestCQCListener
1068+
{
1069+
// ----- constructors -----------------------------------------------
1070+
1071+
public ValidateLiteListener(int count)
1072+
{
1073+
super(count);
1074+
}
1075+
1076+
// ----- MapListener methods ----------------------------------------
1077+
1078+
public void entryUpdated(MapEvent evt)
1079+
{
1080+
super.entryUpdated(evt);
1081+
assertNull(evt.getNewValue());
1082+
assertNull(evt.getOldValue());
1083+
}
1084+
1085+
public void entryInserted(MapEvent evt)
1086+
{
1087+
super.entryInserted(evt);
1088+
assertNull(evt.getNewValue());
1089+
assertNull(evt.getOldValue());
1090+
}
1091+
1092+
public void entryDeleted(MapEvent evt)
1093+
{
1094+
super.entryDeleted(evt);
1095+
assertNull(evt.getOldValue());
1096+
}
1097+
}
1098+
9641099
/**
9651100
* MapListener that verifies the data received in the event.
9661101
*

prj/test/unit/coherence-tests/src/test/java/com/tangosol/net/cache/ContinuousQueryCacheTest.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/*
2-
* Copyright (c) 2000, 2022, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
5-
* http://oss.oracle.com/licenses/upl.
5+
* https://oss.oracle.com/licenses/upl.
66
*/
77

88
package com.tangosol.net.cache;
@@ -12,8 +12,11 @@
1212
import com.tangosol.net.NamedCache;
1313

1414
import com.oracle.coherence.testing.util.BaseMapTest;
15+
16+
import com.tangosol.util.Extractors;
1517
import com.tangosol.util.Filter;
1618
import com.tangosol.util.MapIndex;
19+
import com.tangosol.util.MapListener;
1720
import com.tangosol.util.ObservableMap;
1821
import com.tangosol.util.SimpleMapEntry;
1922
import com.tangosol.util.ValueExtractor;
@@ -878,6 +881,45 @@ public void testCoh2532()
878881
BaseMapTest.assertIdenticalMaps(expectedView, cacheCQC);
879882
}
880883

884+
/**
885+
* Ensure request for no local cache values is ignored when transformer is non-null.
886+
*/
887+
@Test
888+
public void testTransformerNoCacheValues()
889+
{
890+
NamedCache cacheBase = getNewCache("cqc-test");
891+
cacheBase.put(1, new Person("111-11-1111", "Homer", "Simpson", 1945, null, new String[0]));
892+
cacheBase.put(2, new Person("222-22-2222", "Marge", "Simpson", 1950, null, new String[0]));
893+
cacheBase.put(3, new Person("333-33-3333", "Bart", "Simpson", 1985, null, new String[0]));
894+
cacheBase.put(4, new Person("444-44-4444", "Lisa", "Simpson", 1987, null, new String[0]));
895+
896+
ContinuousQueryCache cacheCQC = new ContinuousQueryCache(cacheBase, AlwaysFilter.INSTANCE,
897+
/*fCacheValues*/false, (MapListener)null,
898+
Extractors.extract("firstName"));
899+
Assert.assertTrue(cacheCQC.isReadOnly());
900+
Assert.assertTrue("ensure non-null transformer results in locally cached values", cacheCQC.isCacheValues());
901+
902+
Map expectedView = new HashMap(5);
903+
expectedView.put(1, "Homer");
904+
expectedView.put(2, "Marge");
905+
expectedView.put(3, "Bart");
906+
expectedView.put(4, "Lisa");
907+
908+
// ensure transformed values are stored locally despite conflicting request for no local values
909+
for (Object oValue : cacheCQC.getInternalCache().values())
910+
{
911+
assertNotNull(oValue);
912+
}
913+
914+
BaseMapTest.assertIdenticalMaps(expectedView, cacheCQC);
915+
916+
cacheBase.put(5, new Person("555-55-5555", "Maggie", "Simpson", 2000, null, new String[0]));
917+
expectedView.put(5, "Maggie");
918+
System.out.println(cacheCQC);
919+
920+
BaseMapTest.assertIdenticalMaps(expectedView, cacheCQC);
921+
}
922+
881923
/**
882924
* Testable CQC extension that gives access to the index map.
883925
*/

0 commit comments

Comments
 (0)