Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added DaemonThreadFactory to avoid hanging in the tehuti application #29

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/main/java/io/tehuti/metrics/stats/AsyncGauge.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.tehuti.metrics.Measurable;
import io.tehuti.metrics.MetricConfig;
import io.tehuti.metrics.NamedMeasurableStat;
import io.tehuti.utils.DaemonThreadFactory;
import io.tehuti.utils.RedundantLogFilter;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -30,11 +31,10 @@ public class AsyncGauge implements NamedMeasurableStat {
private final Measurable measurable;

public static final AsyncGaugeConfig DEFAULT_ASYNC_GAUGE_CONFIG =
new AsyncGaugeConfig(Executors.newFixedThreadPool(10, r -> {
Thread thread = new Thread(r);
thread.setDaemon(true); // Set the thread as daemon
return thread;
}), TimeUnit.MINUTES.toMillis(1), 500);
new AsyncGaugeConfig(Executors.newFixedThreadPool(10,
gaojieliu marked this conversation as resolved.
Show resolved Hide resolved
new DaemonThreadFactory("Default_Async_Gauge_Executor")),
TimeUnit.MINUTES.toMillis(1),
500);

public AsyncGauge(Measurable measurable, String metricName) {
this.measurable = measurable;
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/io/tehuti/utils/DaemonThreadFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.tehuti.utils;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;


public class DaemonThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber;
private final String namePrefix;

public DaemonThreadFactory(String threadNamePrefix) {
this.threadNumber = new AtomicInteger(1);
this.namePrefix = threadNamePrefix;
}

public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + "-t" + threadNumber.getAndIncrement());
t.setDaemon(true);
return t;
}

}
2 changes: 1 addition & 1 deletion src/main/java/io/tehuti/utils/RedundantLogFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class RedundantLogFilter {
private static RedundantLogFilter singleton;

private final int bitSetSize;
private final ScheduledExecutorService cleanerExecutor = Executors.newScheduledThreadPool(1);
private final ScheduledExecutorService cleanerExecutor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory("Redundant_Log_Filter"));

private BitSet activeBitset;
private BitSet oldBitSet;
Expand Down
15 changes: 15 additions & 0 deletions src/test/java/io/tehuti/utils/RedundantLogFilterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.tehuti.utils;


public class RedundantLogFilterTest {

/**
* Junit test shutdown will force kill all the live threads (non-daemon), so that here we use a cmd application
* to validate whether {@link RedundantLogFilter} would stuck or not when {@link RedundantLogFilter#shutdown()} is
* not explicitly invoked.
*/
public static void main(String[] args) throws InterruptedException {
RedundantLogFilter logFilter = new RedundantLogFilter(10, 1000);
logFilter.isRedundantLog("test");
}
}
Loading