Skip to content

Commit da72ad0

Browse files
committed
Bug 36800725 - [36774493->24.03.1] Topics: New channels are not assigned to a subscriber when the count is increased
(merge ce/main -> ce/24.03 109931) [git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v24.03/": change = 109932]
1 parent 148af52 commit da72ad0

File tree

19 files changed

+888
-318
lines changed

19 files changed

+888
-318
lines changed

prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/daemon/queueProcessor/service/grid/partitionedService/partitionedCache/PagedTopic.java

+227-103
Large diffs are not rendered by default.

prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/safeService/safeCacheService/safeDistributedCacheService/SafePagedTopicService.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11

22
/*
3-
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
3+
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
44
*
55
* Licensed under the Universal Permissive License v 1.0 as shown at
66
* https://oss.oracle.com/licenses/upl.
@@ -15,6 +15,7 @@
1515
import com.tangosol.internal.net.topic.impl.paged.PagedTopic;
1616
import com.tangosol.internal.net.topic.impl.paged.PagedTopicBackingMapManager;
1717
import com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches;
18+
import com.tangosol.internal.net.topic.impl.paged.PagedTopicConfigMap;
1819
import com.tangosol.internal.net.topic.impl.paged.model.PagedTopicSubscription;
1920
import com.tangosol.net.PagedTopicService;
2021
import com.tangosol.net.TopicService;
@@ -320,6 +321,19 @@ public java.util.Set getSubscribers(String Param_1, com.tangosol.internal.net.to
320321
{
321322
return ((com.tangosol.net.PagedTopicService) getRunningCacheService()).getSubscribers(Param_1, Param_2);
322323
}
324+
325+
@Override
326+
public boolean hasSubscribers(String sTopicName)
327+
{
328+
return ((com.tangosol.net.PagedTopicService) getRunningCacheService()).hasSubscribers(sTopicName);
329+
}
330+
331+
@Override
332+
public long getSubscriptionCount(String sTopicName)
333+
{
334+
return ((com.tangosol.net.PagedTopicService) getRunningCacheService()).getSubscriptionCount(sTopicName);
335+
}
336+
323337
@Override
324338
public com.tangosol.internal.net.topic.impl.paged.model.PagedTopicSubscription getSubscription(long Param_1)
325339
{

prj/coherence-core/src/main/java/com/tangosol/coherence/config/scheme/PagedTopicScheme.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -449,13 +449,14 @@ public List<NamedEventInterceptorBuilder> getEventInterceptorBuilders()
449449
}
450450

451451
// add the subscriber expiry interceptor
452-
NamedEventInterceptorBuilder builder = new NamedEventInterceptorBuilder();
453-
builder.setOrder(Interceptor.Order.HIGH);
454-
builder.setName("$SubscriberExpiry$" + getServiceName());
455-
builder.setRegistrationBehavior(RegistrationBehavior.REPLACE);
456-
builder.setCustomBuilder((resolver, loader, listParameters) -> new PagedTopicSubscriber.TimeoutInterceptor());
452+
NamedEventInterceptorBuilder builderTimeout = new NamedEventInterceptorBuilder();
453+
builderTimeout.setOrder(Interceptor.Order.HIGH);
454+
builderTimeout.setName("$SubscriberExpiry$" + getServiceName());
455+
builderTimeout.setRegistrationBehavior(RegistrationBehavior.REPLACE);
456+
builderTimeout.setCustomBuilder((resolver, loader, listParameters) -> new PagedTopicSubscriber.TimeoutInterceptor());
457+
458+
list.add(builderTimeout);
457459

458-
list.add(builder);
459460
return list;
460461
}
461462

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicCaches.java

-20
Original file line numberDiff line numberDiff line change
@@ -322,26 +322,6 @@ public int getChannelCount()
322322
return f_topicService.getChannelCount(f_sTopicName);
323323
}
324324

325-
/**
326-
* Return the default publisher channel count for this topic.
327-
* <p>
328-
* If the system property {@link Publisher#PROP_CHANNEL_COUNT} with a suffix of a dot
329-
* followed by the topic name is set, that value will be used.
330-
* <p>
331-
* If the system property {@link Publisher#PROP_CHANNEL_COUNT} with no suffix is set,
332-
* that value will be used.
333-
* <p>
334-
* If neither property is set the configured channel count will be used.
335-
*
336-
* @return the default publisher channel count for this topic
337-
*/
338-
public int getPublisherChannelCount()
339-
{
340-
int cChannel = getDependencies().getConfiguredChannelCount();
341-
cChannel = Config.getInteger(Publisher.PROP_CHANNEL_COUNT, cChannel);
342-
return Config.getInteger(Publisher.PROP_CHANNEL_COUNT + "." + f_sTopicName, cChannel);
343-
}
344-
345325
/**
346326
* Return the set of NotificationKeys covering all partitions for the given notifier
347327
*

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicConfigMap.java

+66-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
/*
2-
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
66
*/
77
package com.tangosol.internal.net.topic.impl.paged;
88

9+
import com.oracle.coherence.common.base.Logger;
10+
import com.tangosol.internal.net.topic.ChannelAllocationStrategy;
911
import com.tangosol.internal.net.topic.impl.paged.model.PagedTopicSubscription;
1012
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
1113
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberId;
@@ -187,6 +189,69 @@ public static boolean hasSubscription(Map<?, ?> configMap, long lSubscriptionId,
187189
return false;
188190
}
189191

192+
/**
193+
* Returns {@code true} if the specified topic has any registered subscriptions.
194+
*
195+
* @param configMap the config map for the service
196+
* @param sTopic the name of the topic
197+
*
198+
* @return {@code true} if the specified topic has any registered subscriptions
199+
*/
200+
public static boolean hasSubscriptions(Map<?, ?> configMap, String sTopic)
201+
{
202+
return configMap.keySet().stream()
203+
.filter(key -> key instanceof PagedTopicSubscription.Key)
204+
.map(PagedTopicSubscription.Key.class::cast)
205+
.anyMatch(key -> key.getTopicName().equals(sTopic));
206+
}
207+
208+
/**
209+
* Returns the count of subscriptions for the specified topic.
210+
*
211+
* @param configMap the config map for the service
212+
* @param sTopic the name of the topic
213+
*
214+
* @return the count of subscriptions for the specified topic
215+
*/
216+
public static long getSubscriptionCount(Map<?, ?> configMap, String sTopic)
217+
{
218+
return configMap.keySet().stream()
219+
.filter(key -> key instanceof PagedTopicSubscription.Key)
220+
.map(PagedTopicSubscription.Key.class::cast)
221+
.filter(key -> key.getTopicName().equals(sTopic))
222+
.count();
223+
}
224+
225+
/**
226+
* Update the channel count and allocations for all subscriptions for a topic.
227+
*
228+
* @param configMap the config map to update
229+
* @param sTopic the name of the topic
230+
* @param cChannel the new channel count
231+
* @param strategy the channel allocation strategy
232+
*/
233+
@SuppressWarnings("unchecked")
234+
public static void setChannelCount(Map<?, ?> configMap, String sTopic, int cChannel, ChannelAllocationStrategy strategy)
235+
{
236+
Set<PagedTopicSubscription.Key> setKey = configMap.keySet().stream()
237+
.filter(key -> key instanceof PagedTopicSubscription.Key)
238+
.map(PagedTopicSubscription.Key.class::cast)
239+
.filter(key -> key.getTopicName().equals(sTopic))
240+
.collect(Collectors.toSet());
241+
242+
Map<PagedTopicSubscription.Key, PagedTopicSubscription> mapSub = (Map<PagedTopicSubscription.Key, PagedTopicSubscription>) configMap;
243+
for (PagedTopicSubscription.Key key : setKey)
244+
{
245+
PagedTopicSubscription subscription = (PagedTopicSubscription) configMap.get(key);
246+
if (subscription.getChannelCount() < cChannel)
247+
{
248+
Logger.config("Updating channel count for subscription " + key.getGroupName() + " in topic " + sTopic + " to " + cChannel);
249+
subscription.updateChannelAllocations(strategy, cChannel);
250+
mapSub.put(key, subscription);
251+
}
252+
}
253+
}
254+
190255
/**
191256
* Remove configurations for the specified topic name.
192257
*

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicPartition.java

+17-10
Original file line numberDiff line numberDiff line change
@@ -247,21 +247,12 @@ public OfferProcessor.Result offerToPageTail(BinaryEntry<Page.Key, Page> entry,
247247
int nChannel = keyPage.getChannelId();
248248
long lPage = keyPage.getPageId();
249249
PagedTopicDependencies configuration = getDependencies();
250-
int channelCount = getChannelCount();
251250
int cbCapPage = configuration.getPageCapacity();
252251
long cbCapServer = configuration.getServerCapacity();
253252
List<Binary> listElements = processor.getElements();
254253
int nNotifyPostFull = processor.getNotifyPostFull();
255254
boolean fSealPage = processor.isSealPage();
256255

257-
if (nChannel >= channelCount)
258-
{
259-
// publisher tried to publish to a non-existent channel,
260-
// the publisher probably has an incorrect channel count
261-
throw new RequestIncompleteException("Invalid channel " + nChannel
262-
+ ", channel count is " + channelCount + ", valid channels are 0.." + (channelCount - 1));
263-
}
264-
265256
if (cbCapServer > 0 && nNotifyPostFull != 0)
266257
{
267258
// check if we own a large enough number of partitions that we would exceed the server capacity before
@@ -453,13 +444,16 @@ protected Page ensurePage(int nChannel, BinaryEntry<Page.Key, Page> entry)
453444
usage.setPartitionTail(lPage);
454445
usage.setPartitionMax(lPage); // unlike tail this is never reset to NULL_PAGE
455446

447+
boolean fFirstPage;
456448
if (lTailPrev == Page.NULL_PAGE)
457449
{
458450
// partition was empty, our new tail is also our head
459451
usage.setPartitionHead(lPage);
452+
fFirstPage = true;
460453
}
461454
else
462455
{
456+
fFirstPage = false;
463457
// attach old tail to new tail
464458
page.incrementReferenceCount(); // ref from old tail to new page
465459

@@ -471,7 +465,20 @@ protected Page ensurePage(int nChannel, BinaryEntry<Page.Key, Page> entry)
471465
}
472466

473467
// attach on behalf of waiting subscribers, they will have to find this page on their own
474-
page.adjustReferenceCount(usage.resetWaitingSubscriberCount());
468+
int c = usage.resetWaitingSubscriberCount();
469+
if (fFirstPage && c == 0)
470+
{
471+
// The Usage may have zero subscriptions if a Publisher has recently increased the channel count
472+
// In this case we check the service to see whether the topic has subscriptions
473+
String sCache = entry.getBackingMapContext().getCacheName();
474+
String sTopic = PagedTopicCaches.Names.getTopicName(sCache);
475+
long cSubscription = f_service.getSubscriptionCount(sTopic);
476+
if (cSubscription > 0L)
477+
{
478+
c = 1;//(int) cSubscription;
479+
}
480+
}
481+
page.adjustReferenceCount(c);
475482

476483
long nTime = getClusterTime();
477484
page.setTimestampHead(nTime);

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicPublisher.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
@@ -116,7 +116,7 @@ public PagedTopicPublisher(NamedTopic<V> topic, PagedTopicCaches pagedTopicCache
116116

117117
ChannelCount channelCount = options.get(ChannelCount.class, ChannelCount.USE_CONFIGURED);
118118
int cChannel = channelCount.isUseConfigured()
119-
? pagedTopicCaches.getPublisherChannelCount()
119+
? topic.getChannelCount()
120120
: channelCount.getChannelCount();
121121

122122
long cbBatch = m_caches.getDependencies().getMaxBatchSizeBytes();

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -2381,9 +2381,14 @@ private void updateChannelOwnership(SortedSet<Integer> setChannel, boolean fLost
23812381
PagedTopicChannel[] aExistingChannel = m_aChannel;
23822382
if (nMaxChannel >= aExistingChannel.length)
23832383
{
2384-
int cChannel = nMaxChannel + 1;
2385-
// this subscriber has fewer channels than the server so needs to be resized
2386-
m_aChannel = initializeChannels(m_caches, cChannel, f_subscriberGroupId, aExistingChannel);
2384+
// This subscriber has fewer channels than the server so needs to be resized
2385+
// We disconnect as the subscription may not be properly initialized for
2386+
// the new channel count if this has happened due to a rolling upgrade
2387+
// from an earlier buggy topics version
2388+
Logger.finer(() -> String.format("Disconnecting subscriber %d on topic %s due to increase in channel count from %d to %d",
2389+
f_id.getId(), f_topic.getName(), aExistingChannel.length, nMaxChannel));
2390+
disconnectInternal(true);
2391+
return;
23872392
}
23882393

23892394
if (!Arrays.equals(m_aChannelOwned, aChannel))
@@ -2401,7 +2406,7 @@ private void updateChannelOwnership(SortedSet<Integer> setChannel, boolean fLost
24012406
}
24022407
setRevoked = Collections.unmodifiableSet(setRevoked);
24032408

2404-
Set<Integer> setAssigned = Collections.unmodifiableSet(new HashSet<>(setChannel));
2409+
Set<Integer> setAssigned = Set.copyOf(setChannel);
24052410

24062411
Logger.finest(String.format("Subscriber %d (name=%s) channel allocation changed, assigned=%s added=%s revoked=%s",
24072412
f_id.getId(), f_sIdentifyingName, setAssigned, setAdded, setRevoked));

prj/coherence-core/src/main/java/com/tangosol/net/PagedTopicService.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
@@ -9,6 +9,7 @@
99

1010
import com.tangosol.internal.net.topic.impl.paged.PagedTopicBackingMapManager;
1111

12+
import com.tangosol.internal.net.topic.impl.paged.PagedTopicConfigMap;
1213
import com.tangosol.internal.net.topic.impl.paged.model.PagedTopicSubscription;
1314
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
1415
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberId;
@@ -195,6 +196,24 @@ long ensureSubscription(String sTopicName, SubscriberGroupId groupId, Subscriber
195196
*/
196197
Set<SubscriberId> getSubscribers(String sTopicName, SubscriberGroupId groupId);
197198

199+
/**
200+
* Returns {@code true} if the specified topic has subscribers or subscriber groups.
201+
*
202+
* @param sTopicName the name of the topic
203+
*
204+
* @return {@code true} if the specified topic has subscribers or subscriber groups
205+
*/
206+
boolean hasSubscribers(String sTopicName);
207+
208+
/**
209+
* Returns the count of subscriptions for the specified topic.
210+
*
211+
* @param sTopicName the name of the topic
212+
*
213+
* @return the count of subscriptions for the specified topic
214+
*/
215+
long getSubscriptionCount(String sTopicName);
216+
198217
/**
199218
* Add a listener that will be notified when changes are made to topic subscriptions.
200219
*

0 commit comments

Comments
 (0)