Skip to content

Commit 65e7744

Browse files
committed
Enh 37387065 - [37381796->25.03] Topics: general refactoring and hardening
(merge main -> ce/main 113487) [git-p4: depot-paths = "//dev/coherence-ce/main/": change = 113488]
1 parent 5dde26c commit 65e7744

File tree

2 files changed

+40
-3
lines changed

2 files changed

+40
-3
lines changed

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriberConnector.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
/*
2-
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
66
*/
77

88
package com.tangosol.internal.net.topic.impl.paged;
99

10+
import com.oracle.coherence.common.base.Blocking;
1011
import com.oracle.coherence.common.base.Exceptions;
1112
import com.oracle.coherence.common.base.Logger;
1213

@@ -261,6 +262,25 @@ public void closeSubscription(ConnectedSubscriber<V> subscriber, boolean fDestro
261262
// finalizers in the JVM are not reliable that is probably not such a good idea.
262263
destroy(f_caches, f_subscriberGroupId, m_subscriptionId);
263264
}
265+
266+
// We need to ensure that the subscription has really gone.
267+
// During a fail-over situation the subscriber may still exist in the configmap
268+
// so we need to repeat the closure notification
269+
TopicSubscription subscription = getSubscription(subscriber, m_subscriptionId);
270+
while (subscription != null && subscription.getSubscriberTimestamp(f_subscriberId) != Long.MAX_VALUE)
271+
{
272+
try
273+
{
274+
Blocking.sleep(100);
275+
}
276+
catch (InterruptedException e)
277+
{
278+
break;
279+
}
280+
Logger.fine("Repeating subscriber closed notification for topic subscriber: " + subscriber);
281+
PagedTopicSubscription.notifyClosed(f_caches.Subscriptions, f_subscriberGroupId, m_subscriptionId, f_subscriberId);
282+
subscription = getSubscription(subscriber, m_subscriptionId);
283+
}
264284
}
265285

266286
@Override

prj/test/functional/topics/src/main/java/topics/AbstractTopicsStorageRecoveryTests.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public void shouldRecoverAfterCleanStorageRestart() throws Exception
237237
catch (Throwable t)
238238
{
239239
Logger.err("Error in publish loop");
240-
t.printStackTrace();
240+
Logger.err(t);
241241
}
242242
fPublished.set(true);
243243
};
@@ -296,7 +296,24 @@ optComplete, withIdentifyingName(sName)))
296296
{
297297
if (future != null && !future.isDone())
298298
{
299-
future.cancel(true);
299+
if (!future.cancel(true))
300+
{
301+
// the future has actually completed
302+
if (future.isDone() && !future.isCompletedExceptionally())
303+
{
304+
Subscriber.Element<Message> element = future.get();
305+
if (element != null)
306+
{
307+
mapReceived.put(element.getValue(), element);
308+
element.commit();
309+
cReceived.incrementAndGet();
310+
if (i >= 5)
311+
{
312+
fSubscribed.set(true);
313+
}
314+
}
315+
}
316+
}
300317
}
301318
if (!(t instanceof TimeoutException))
302319
{

0 commit comments

Comments
 (0)