|
1 | 1 | /*
|
2 |
| - * Copyright (c) 2000, 2024, Oracle and/or its affiliates. |
| 2 | + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. |
3 | 3 | *
|
4 | 4 | * Licensed under the Universal Permissive License v 1.0 as shown at
|
5 | 5 | * https://oss.oracle.com/licenses/upl.
|
@@ -296,7 +296,7 @@ public void asyncBacklog()
|
296 | 296 | AsynchronousProcessor[] aAsync = new AsynchronousProcessor[COUNT];
|
297 | 297 | for (int i = 0; true; i++)
|
298 | 298 | {
|
299 |
| - EntryProcessor proc = new UpdaterProcessor((ValueUpdater) null, |
| 299 | + EntryProcessor proc = new UpdateProcessorForBacklog((ValueUpdater) null, |
300 | 300 | Integer.valueOf(Base.getRandom().nextInt()));
|
301 | 301 | AsynchronousProcessor async = new AsynchronousProcessor(proc, i);
|
302 | 302 |
|
@@ -327,6 +327,7 @@ public synchronized void proceed(Object o)
|
327 | 327 |
|
328 | 328 | if (async.checkBacklog(contNormal))
|
329 | 329 | {
|
| 330 | + m_fBackLogAnnounced = true; |
330 | 331 | log("Backlog announced " + i);
|
331 | 332 |
|
332 | 333 | async.flush();
|
@@ -663,5 +664,53 @@ public Object process(Entry entry)
|
663 | 664 | return ctx.getBackingMapEntry(binKey).getValue();
|
664 | 665 | }
|
665 | 666 | }
|
| 667 | + |
| 668 | + /** |
| 669 | + * An UpdaterProcessor that ensures backlog. |
| 670 | + */ |
| 671 | + public static class UpdateProcessorForBacklog<K, V, T> |
| 672 | + extends UpdaterProcessor<K, V, T> |
| 673 | + { |
| 674 | + /** |
| 675 | + * Default constructor (necessary for the ExternalizableLite interface). |
| 676 | + */ |
| 677 | + public UpdateProcessorForBacklog() |
| 678 | + { |
| 679 | + } |
| 680 | + |
| 681 | + public UpdateProcessorForBacklog(ValueUpdater<V, T> updater, T value) |
| 682 | + { |
| 683 | + super(updater, value); |
| 684 | + } |
| 685 | + |
| 686 | + @Override |
| 687 | + public Boolean process(InvocableMap.Entry<K, V> entry) |
| 688 | + { |
| 689 | + ValueUpdater<V, T> updater = m_updater; |
| 690 | + |
| 691 | + // if there isn't backlog, sleep to slow down |
| 692 | + if (!m_fBackLogAnnounced) |
| 693 | + { |
| 694 | + Base.sleep(100); |
| 695 | + } |
| 696 | + |
| 697 | + if (updater == null) |
| 698 | + { |
| 699 | + //NOTE: a possibly unsafe cast from T to V |
| 700 | + entry.setValue((V) m_value, false); |
| 701 | + } |
| 702 | + else if (entry.isPresent()) |
| 703 | + { |
| 704 | + entry.update(updater, m_value); |
| 705 | + } |
| 706 | + else |
| 707 | + { |
| 708 | + return Boolean.FALSE; |
| 709 | + } |
| 710 | + return Boolean.TRUE; |
| 711 | + } |
| 712 | + } |
| 713 | + |
| 714 | + private static boolean m_fBackLogAnnounced = false; |
666 | 715 | }
|
667 | 716 |
|
0 commit comments