Skip to content

Commit f8f02cc

Browse files
committed
Bug 38360810 - [38298007->15.1.1.0.0] make RefreshableAddressProvider use an executor for the refresh thread
(merge 15.1.1-0 -> ce/15.1.1-0 118491) [git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v15.1.1.0/": change = 118492]
1 parent 963ed89 commit f8f02cc

File tree

4 files changed

+118
-31
lines changed

4 files changed

+118
-31
lines changed

prj/coherence-core-21/src/main/java/com/tangosol/internal/util/VirtualThreads.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313

1414
import com.tangosol.net.security.SecurityHelper;
1515

16+
import java.util.concurrent.Executor;
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.ThreadFactory;
19+
1620
import java.util.function.Function;
1721

1822
/**
@@ -94,6 +98,21 @@ public static boolean isEnabled(String sServiceName)
9498
: Config.getBoolean(PROPERTY_SERVICE_ENABLED.apply(sServiceName), isEnabled());
9599
}
96100

101+
102+
/**
103+
* Returns either a new virtual thread per-task executor on Java 21 or higher
104+
* or a single threaded executor if lower than Java 21.
105+
*
106+
* @param factory the {@link ThreadFactory} to use if not on Java 21
107+
*
108+
* @return either a new virtual thread per-task executor on Java 21 or higher
109+
* or a single threaded executor if lower than Java 21.
110+
*/
111+
public static Executor newMaybeVirtualThreadExecutor(ThreadFactory factory)
112+
{
113+
return Executors.newVirtualThreadPerTaskExecutor();
114+
}
115+
97116
// ---- constants -------------------------------------------------------
98117

99118
/**

prj/coherence-core/src/main/java/com/tangosol/internal/util/VirtualThreads.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99

1010
import com.tangosol.util.Base;
1111

12+
import java.util.concurrent.Executor;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.ThreadFactory;
15+
1216
/**
1317
* Helper class for virtual threads functionality.
1418
* <p>
@@ -68,4 +72,18 @@ public static boolean isEnabled(String serviceName)
6872
{
6973
return false;
7074
}
75+
76+
/**
77+
* Returns either a new virtual thread per-task executor on Java 21 or higher
78+
* or a single threaded executor if lower than Java 21.
79+
*
80+
* @param factory the {@link ThreadFactory} to use if not on Java 21
81+
*
82+
* @return either a new virtual thread per-task executor on Java 21 or higher
83+
* or a single threaded executor if lower than Java 21.
84+
*/
85+
public static Executor newMaybeVirtualThreadExecutor(ThreadFactory factory)
86+
{
87+
return Executors.newSingleThreadExecutor(factory);
88+
}
7189
}

prj/coherence-core/src/main/java/com/tangosol/net/RefreshableAddressProvider.java

Lines changed: 74 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,29 @@
11
/*
2-
* Copyright (c) 2000, 2020, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
5-
* http://oss.oracle.com/licenses/upl.
5+
* https://oss.oracle.com/licenses/upl.
66
*/
77

88
package com.tangosol.net;
99

1010

1111
import com.tangosol.coherence.config.Config;
1212

13+
import com.tangosol.internal.util.VirtualThreads;
14+
1315
import com.tangosol.util.Base;
14-
import com.tangosol.util.Daemon;
1516

1617
import java.net.InetSocketAddress;
1718

1819
import java.util.ArrayList;
1920
import java.util.Iterator;
2021
import java.util.List;
2122

23+
import java.util.concurrent.Executor;
24+
import java.util.concurrent.locks.Lock;
25+
import java.util.concurrent.locks.ReentrantLock;
26+
2227

2328
/**
2429
* A RefreshableAddressProvider is an AddressProvider implementation
@@ -57,7 +62,9 @@ public RefreshableAddressProvider(AddressProvider ap, long lRefresh)
5762
// populate the initial address list cache
5863
refreshAddressList();
5964
// initialize the refresh thread
60-
f_daemonRefresh = new RefreshThread(lRefresh);
65+
String sName = RefreshableAddressProvider.this.getClass().getName() + ": RefreshThread";
66+
f_refreshTask = new RefreshTask(lRefresh);
67+
f_daemonRefresh = VirtualThreads.newMaybeVirtualThreadExecutor(r -> new Thread(r, sName));
6168
}
6269

6370
/**
@@ -178,54 +185,74 @@ protected void refreshAddressList()
178185
*/
179186
protected void ensureRefreshThread()
180187
{
181-
if (!f_daemonRefresh.isRunning())
188+
if (!f_refreshTask.isRunning())
182189
{
183-
f_daemonRefresh.start();
190+
f_daemonRefresh.execute(f_refreshTask);
184191
}
185192
}
186193

187194

188195
// ----- inner class: RefreshThread -------------------------------------
189196

190-
protected class RefreshThread
191-
extends Daemon
197+
protected class RefreshTask
198+
implements Runnable
192199
{
193200
// ----- constructors -----------------------------------------------
194201

195202
/**
196-
* Construct a new RefreshThread with the specified refresh interval.
203+
* Construct a new RefreshTask with the specified refresh interval.
197204
*
198205
* @param lRefresh the refresh interval
199206
*/
200-
protected RefreshThread(long lRefresh)
207+
protected RefreshTask(long lRefresh)
201208
{
202-
super(RefreshableAddressProvider.this.getClass().getName() +
203-
": RefreshThread");
204209
f_lRefresh = lRefresh;
205210
}
206211

207-
/**
208-
* {@inheritDoc}
209-
*/
212+
public boolean isRunning()
213+
{
214+
return m_fRunning;
215+
}
216+
217+
@Override
210218
public void run()
211219
{
212-
long lRefresh = f_lRefresh;
213-
while (!isStopping())
220+
if (m_fRunning)
214221
{
215-
try
216-
{
217-
refreshAddressList();
218-
stop();
219-
}
220-
catch (Throwable t)
222+
return;
223+
}
224+
225+
f_lock.lock();
226+
try
227+
{
228+
if (!m_fRunning)
221229
{
222-
err("An exception occurred while refreshing an address list: " +
223-
"\n" + getStackTrace(t) +
224-
"\nReducing the refresh rate.");
225-
Base.sleep(lRefresh);
226-
lRefresh = 2 * lRefresh;
230+
m_fRunning = true;
231+
long lRefresh = f_lRefresh;
232+
while (m_fRunning)
233+
{
234+
try
235+
{
236+
refreshAddressList();
237+
m_fRunning = false;
238+
}
239+
catch (Throwable t)
240+
{
241+
err("An exception occurred while refreshing an address list: " +
242+
"\n" + t.getMessage() +
243+
"\n" + getStackTrace(t) +
244+
"\nReducing the refresh rate.");
245+
Base.sleep(lRefresh);
246+
lRefresh = 2 * lRefresh;
247+
}
248+
}
227249
}
228250
}
251+
finally
252+
{
253+
m_fRunning = false;
254+
f_lock.unlock();
255+
}
229256
}
230257

231258
// ----- data members -----------------------------------------------
@@ -234,6 +261,16 @@ public void run()
234261
* The interval with which to attempt to refresh the address list.
235262
*/
236263
protected final long f_lRefresh;
264+
265+
/**
266+
* The flag to indicate whether the task is running.
267+
*/
268+
private volatile boolean m_fRunning = false;
269+
270+
/**
271+
* A lock to control running only a single task.
272+
*/
273+
private final Lock f_lock = new ReentrantLock();
237274
}
238275

239276

@@ -357,7 +394,15 @@ protected void refreshIterator()
357394
/**
358395
* The refresh daemon.
359396
*/
360-
protected final Daemon f_daemonRefresh;
397+
protected final RefreshTask f_refreshTask;
398+
399+
/**
400+
* The refresh executor.
401+
* <p>
402+
* This will be a single threaded executor on Java 17 or a
403+
* virtual thread per-task on Java 21 or higher.
404+
*/
405+
protected final Executor f_daemonRefresh;
361406

362407
/**
363408
* An Iterator over the cached set of addresses.

prj/coherence-core/src/main/java/com/tangosol/util/Daemon.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -750,7 +750,10 @@ protected void configureWorker(DaemonWorker worker)
750750
ThreadGroup curThreadGroup = ensureThreadGroup();
751751
synchronized (curThreadGroup) // ensures that the thread group is not destroyed concurrently
752752
{
753-
ensureThreadGroup();
753+
if (curThreadGroup.isDestroyed())
754+
{
755+
ensureThreadGroup();
756+
}
754757
threadWorker = makeThread(m_threadGroup, worker, null);
755758
}
756759

@@ -781,9 +784,11 @@ protected void configureWorker(DaemonWorker worker)
781784
protected ThreadGroup ensureThreadGroup()
782785
{
783786
ThreadGroup threadGroup = m_threadGroup;
784-
if (threadGroup == null)
787+
if (threadGroup == null || threadGroup.isDestroyed())
785788
{
786789
threadGroup = m_threadGroup = new ThreadGroup(getConfiguredName());
790+
// Make it a daemon so that it is destroyed automatically.
791+
threadGroup.setDaemon(true);
787792
}
788793
return threadGroup;
789794
}

0 commit comments

Comments
 (0)