From d87facddc9a06113cd3ac5e2a1998c312ec63c6d Mon Sep 17 00:00:00 2001 From: Chris Dennis Date: Wed, 20 Nov 2024 16:13:39 -0500 Subject: [PATCH] TDB-19133 : If invalidations are not claimed by reconnect clients, then don't attempt to fire invalidation completion messages --- .../server/store/ClusterTierActiveEntity.java | 123 ++++++++++-------- .../store/ClusterTierActiveEntityTest.java | 13 ++ 2 files changed, 83 insertions(+), 53 deletions(-) diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/store/ClusterTierActiveEntity.java b/clustered/server/src/main/java/org/ehcache/clustered/server/store/ClusterTierActiveEntity.java index 7cbda2db25..413e4d5598 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/store/ClusterTierActiveEntity.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/store/ClusterTierActiveEntity.java @@ -1,5 +1,6 @@ /* * Copyright Terracotta, Inc. + * Copyright Super iPaaS Integration LLC, an IBM Company 2024 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -381,21 +382,41 @@ private void invalidateAll(ClientDescriptor originatingClientDescriptor) { clientsToInvalidate.remove(originatingClientDescriptor); } - InvalidationHolder invalidationHolder = new InvalidationHolder(originatingClientDescriptor, clientsToInvalidate); - clientsWaitingForInvalidation.put(invalidationId, invalidationHolder); - LOGGER.debug("SERVER: requesting {} client(s) invalidation of all in cache {} (ID {})", clientsToInvalidate.size(), storeIdentifier, invalidationId); - for (ClientDescriptor clientDescriptorThatHasToInvalidate : clientsToInvalidate) { - LOGGER.debug("SERVER: asking client {} to invalidate all from cache {} (ID {})", clientDescriptorThatHasToInvalidate, storeIdentifier, invalidationId); - try { - clientCommunicator.sendNoResponse(clientDescriptorThatHasToInvalidate, clientInvalidateAll(invalidationId)); - } catch (MessageCodecException mce) { - throw new AssertionError("Codec error", mce); + + if (clientsToInvalidate.isEmpty()) { + invalidateAllComplete(originatingClientDescriptor, invalidationId); + } else { + InvalidationHolder invalidationHolder = new InvalidationHolder(originatingClientDescriptor, clientsToInvalidate); + clientsWaitingForInvalidation.put(invalidationId, invalidationHolder); + for (ClientDescriptor clientDescriptorThatHasToInvalidate : clientsToInvalidate) { + LOGGER.debug("SERVER: asking client {} to invalidate all from cache {} (ID {})", clientDescriptorThatHasToInvalidate, storeIdentifier, invalidationId); + try { + clientCommunicator.sendNoResponse(clientDescriptorThatHasToInvalidate, clientInvalidateAll(invalidationId)); + } catch (MessageCodecException mce) { + throw new AssertionError("Codec error", mce); + } } } + } - if (clientsToInvalidate.isEmpty()) { - clientInvalidated(invalidationHolder.clientDescriptorWaitingForInvalidation, invalidationId); + private void invalidateAllComplete(ClientDescriptor initiator, int invalidationId) { + try { + if (isStrong()) { + if (initiator != null) { + clientCommunicator.sendNoResponse(initiator, allInvalidationDone()); + LOGGER.debug("SERVER: notifying originating client that all other clients invalidated all in cache {} from {} (ID {})", storeIdentifier, initiator, invalidationId); + } + } else { + entityMessenger.messageSelf(new ClearInvalidationCompleteMessage()); + + InvalidationTracker invalidationTracker = stateService.getInvalidationTracker(storeIdentifier); + if (invalidationTracker != null) { + invalidationTracker.setClearInProgress(false); + } + } + } catch (MessageCodecException mce) { + throw new AssertionError("Codec error", mce); } } @@ -403,43 +424,18 @@ private void clientInvalidated(ClientDescriptor clientDescriptor, int invalidati InvalidationHolder invalidationHolder = clientsWaitingForInvalidation.get(invalidationId); if (invalidationHolder == null) { // Happens when client is re-sending/sending invalidations for which server has lost track since fail-over happened. - LOGGER.debug("Ignoring invalidation from client {} " + clientDescriptor); + LOGGER.debug("Ignoring invalidation from client {} ", clientDescriptor); return; } invalidationHolder.clientsHavingToInvalidate.remove(clientDescriptor); if (invalidationHolder.clientsHavingToInvalidate.isEmpty()) { if (clientsWaitingForInvalidation.remove(invalidationId) != null) { - try { - Long key = invalidationHolder.key; - if (key == null) { - if (isStrong()) { - clientCommunicator.sendNoResponse(invalidationHolder.clientDescriptorWaitingForInvalidation, allInvalidationDone()); - LOGGER.debug("SERVER: notifying originating client that all other clients invalidated all in cache {} from {} (ID {})", storeIdentifier, clientDescriptor, invalidationId); - } else { - entityMessenger.messageSelf(new ClearInvalidationCompleteMessage()); - - InvalidationTracker invalidationTracker = stateService.getInvalidationTracker(storeIdentifier); - if (invalidationTracker != null) { - invalidationTracker.setClearInProgress(false); - } - - } - } else { - if (isStrong()) { - clientCommunicator.sendNoResponse(invalidationHolder.clientDescriptorWaitingForInvalidation, hashInvalidationDone(key)); - LOGGER.debug("SERVER: notifying originating client that all other clients invalidated key {} in cache {} from {} (ID {})", key, storeIdentifier, clientDescriptor, invalidationId); - } else { - entityMessenger.messageSelf(new InvalidationCompleteMessage(key)); - - InvalidationTracker invalidationTracker = stateService.getInvalidationTracker(storeIdentifier); - if (invalidationTracker != null) { - invalidationTracker.untrackHashInvalidation(key); - } - } - } - } catch (MessageCodecException mce) { - throw new AssertionError("Codec error", mce); + Long key = invalidationHolder.key; + if (key == null) { + invalidateAllComplete(invalidationHolder.clientDescriptorWaitingForInvalidation, invalidationId); + } else { + invalidateComplete(invalidationHolder.clientDescriptorWaitingForInvalidation, key, invalidationId); } } } @@ -452,21 +448,42 @@ private void invalidateHashForClient(ClientDescriptor originatingClientDescripto clientsToInvalidate.remove(originatingClientDescriptor); } - InvalidationHolder invalidationHolder = new InvalidationHolder(originatingClientDescriptor, clientsToInvalidate, key); - clientsWaitingForInvalidation.put(invalidationId, invalidationHolder); - LOGGER.debug("SERVER: requesting {} client(s) invalidation of hash {} in cache {} (ID {})", clientsToInvalidate.size(), key, storeIdentifier, invalidationId); - for (ClientDescriptor clientDescriptorThatHasToInvalidate : clientsToInvalidate) { - LOGGER.debug("SERVER: asking client {} to invalidate hash {} from cache {} (ID {})", clientDescriptorThatHasToInvalidate, key, storeIdentifier, invalidationId); - try { - clientCommunicator.sendNoResponse(clientDescriptorThatHasToInvalidate, clientInvalidateHash(key, invalidationId)); - } catch (MessageCodecException mce) { - throw new AssertionError("Codec error", mce); + + if (clientsToInvalidate.isEmpty()) { + invalidateComplete(originatingClientDescriptor, key, invalidationId); + } else { + InvalidationHolder invalidationHolder = new InvalidationHolder(originatingClientDescriptor, clientsToInvalidate, key); + clientsWaitingForInvalidation.put(invalidationId, invalidationHolder); + + for (ClientDescriptor clientDescriptorThatHasToInvalidate : clientsToInvalidate) { + LOGGER.debug("SERVER: asking client {} to invalidate hash {} from cache {} (ID {})", clientDescriptorThatHasToInvalidate, key, storeIdentifier, invalidationId); + try { + clientCommunicator.sendNoResponse(clientDescriptorThatHasToInvalidate, clientInvalidateHash(key, invalidationId)); + } catch (MessageCodecException mce) { + throw new AssertionError("Codec error", mce); + } } } + } - if (clientsToInvalidate.isEmpty()) { - clientInvalidated(invalidationHolder.clientDescriptorWaitingForInvalidation, invalidationId); + private void invalidateComplete(ClientDescriptor initiator, long key, int invalidationId) { + try { + if (isStrong()) { + if (initiator != null) { + clientCommunicator.sendNoResponse(initiator, hashInvalidationDone(key)); + LOGGER.debug("SERVER: notifying originating client that all other clients invalidated key {} in cache {} from {} (ID {})", key, storeIdentifier, initiator, invalidationId); + } + } else { + entityMessenger.messageSelf(new InvalidationCompleteMessage(key)); + + InvalidationTracker invalidationTracker = stateService.getInvalidationTracker(storeIdentifier); + if (invalidationTracker != null) { + invalidationTracker.untrackHashInvalidation(key); + } + } + } catch (MessageCodecException mce) { + throw new AssertionError("Codec error", mce); } } diff --git a/clustered/server/src/test/java/org/ehcache/clustered/server/store/ClusterTierActiveEntityTest.java b/clustered/server/src/test/java/org/ehcache/clustered/server/store/ClusterTierActiveEntityTest.java index dc4c572145..078121bfb6 100644 --- a/clustered/server/src/test/java/org/ehcache/clustered/server/store/ClusterTierActiveEntityTest.java +++ b/clustered/server/src/test/java/org/ehcache/clustered/server/store/ClusterTierActiveEntityTest.java @@ -1,5 +1,6 @@ /* * Copyright Terracotta, Inc. + * Copyright Super iPaaS Integration LLC, an IBM Company 2024 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -942,6 +943,18 @@ public void testLoadExistingRecoversInflightInvalidationsForEventualCache() thro verify(clientCommunicator, times(10)).sendNoResponse(ArgumentMatchers.eq(client), ArgumentMatchers.isA(EhcacheEntityResponse.ClientInvalidateHash.class)); } + @Test + public void testInvalidationHandlingOnReconnectWindowTimeoutClosure() throws Exception { + ClusterTierActiveEntity activeEntity = new ClusterTierActiveEntity(defaultRegistry, defaultConfiguration, DEFAULT_MAPPER, SYNC_GETS_EXECUTOR); + EhcacheStateServiceImpl ehcacheStateService = defaultRegistry.getStoreManagerService(); + ehcacheStateService.createStore(defaultStoreName, defaultStoreConfiguration, false); //Passive would have done this before failover + + InvalidationTracker invalidationTracker = ehcacheStateService.getInvalidationTracker(defaultStoreName); + invalidationTracker.trackHashInvalidation(1L); + + activeEntity.startReconnect().close(); + } + @Test @SuppressWarnings("unchecked") public void testReplicationMessageAndOriginalServerStoreOpMessageHasSameConcurrency() throws Exception {