Skip to content

Commit 72ffa60

Browse files
committed
Enh 37387064 - [37381796->24.09.2] Topics: general refactoring and hardening
(merge ce/main -> ce/24.09 113428) [git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v24.09/": change = 113429]
1 parent 2eae03f commit 72ffa60

File tree

3 files changed

+30
-18
lines changed

3 files changed

+30
-18
lines changed

prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/safeNamedTopic/SafePublisherConnector.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,10 +308,6 @@ protected PublisherChannelConnector<V> ensureRunningChannelConnector()
308308
{
309309
PublisherChannelConnector<V> connector = m_channelConnector;
310310
TopicService service = getTopicService();
311-
if (!service.isRunning())
312-
{
313-
System.out.println();
314-
}
315311
if (!service.isRunning() || connector == null || !connector.isActive())
316312
{
317313
f_lock.lock();

prj/coherence-core/src/main/java/com/oracle/coherence/common/base/Exceptions.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2020, 2023, Oracle and/or its affiliates.
2+
* Copyright (c) 2020, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
@@ -99,6 +99,25 @@ else if (t.getClass().getName().equals("javax.ejb.EJBException"))
9999
}
100100
}
101101

102+
/**
103+
* Return the root cause of an exception.
104+
*
105+
* @param t the exception to find the root cause in
106+
*
107+
* @return the root cause of the exception
108+
*/
109+
public static Throwable getRootCause(Throwable t)
110+
{
111+
Throwable rootCause = t;
112+
Throwable cause = t.getCause();
113+
while (cause != null)
114+
{
115+
rootCause = cause;
116+
cause = cause.getCause();
117+
}
118+
return rootCause;
119+
}
120+
102121
/**
103122
* Re-throw the specified exception if it is a fatal unrecoverable exception.
104123
*

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicSubscriber.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
@@ -103,7 +103,7 @@
103103
*
104104
* @author Jonathan Knight 2024.11.26
105105
*/
106-
@SuppressWarnings({"rawtypes", "PatternVariableCanBeUsed", "SameParameterValue"})
106+
@SuppressWarnings({"rawtypes", "PatternVariableCanBeUsed", "SameParameterValue", "SimplifyStreamApiCallChains"})
107107
public class NamedTopicSubscriber<V>
108108
implements Subscriber<V>, SubscriberConnector.ConnectedSubscriber<V>, SubscriberStatistics, AutoCloseable
109109
{
@@ -414,7 +414,7 @@ public CompletableFuture<Map<Integer, CommitResult>> commitAsync(Map<Integer, Po
414414
.map(e -> commitInternal(e.getKey(), e.getValue(), mapResult))
415415
.toArray(CompletableFuture[]::new);
416416

417-
return CompletableFuture.allOf(aFuture).handle((_void, err) -> mapResult);
417+
return CompletableFuture.allOf(aFuture).thenApply(_void -> mapResult);
418418
}
419419

420420
/**
@@ -1379,16 +1379,13 @@ private CompletableFuture<CommitResult> commitInternal(int nChannel, Position po
13791379
{
13801380
TopicChannel channel = m_aChannel[nChannel];
13811381
return f_connector.commit(this, nChannel, position)
1382-
.handle((result, err) ->
1382+
.thenApply(result ->
13831383
{
1384-
if (err == null)
1384+
if (mapResult != null)
13851385
{
1386-
if (mapResult != null)
1387-
{
1388-
mapResult.put(nChannel, result);
1389-
}
1390-
channel.committed(position);
1386+
mapResult.put(nChannel, result);
13911387
}
1388+
channel.committed(position);
13921389
return result;
13931390
});
13941391
}
@@ -1458,7 +1455,7 @@ private Map<Integer, Position> seekInternal(Map<Integer, Position> mapPosition)
14581455

14591456
List<Integer> listUnallocated = mapPosition.keySet().stream()
14601457
.filter(c -> !isOwner(c))
1461-
.toList();
1458+
.collect(Collectors.toList());
14621459

14631460
if (!listUnallocated.isEmpty())
14641461
{
@@ -1501,7 +1498,7 @@ private Map<Integer, Position> seekInternalToTimestamps(Map<Integer, Instant> ma
15011498

15021499
List<Integer> listUnallocated = mapInstant.keySet().stream()
15031500
.filter(c -> !isOwner(c))
1504-
.toList();
1501+
.collect(Collectors.toList());
15051502

15061503
if (!listUnallocated.isEmpty())
15071504
{
@@ -1538,7 +1535,7 @@ private void ensureActiveAnOwnedChannels(int... anChannel)
15381535
List<Integer> listUnallocated = Arrays.stream(anChannel)
15391536
.filter(c -> !isOwner(c))
15401537
.boxed()
1541-
.toList();
1538+
.collect(Collectors.toList());
15421539

15431540
if (!listUnallocated.isEmpty())
15441541
{

0 commit comments

Comments
 (0)