Skip to content

Commit a80428f

Browse files
authored
Detailed tasks information as metrics (#371)
* Add compaction stats, add metrics to the JobExecutor (but not exposed yet) and modify some methods to be more reusable * Add parameter to allow disabling gather of extended metrics * Add StreamInfo metrics * Fix NPE * Improve backwards compatibility * Fix formatting * Fix labelValues adder (defaultLabelValuse were overwritten), add tests to verify the methods do not error and have correct labelValues and labelNames sizes in the end results also. * Add missing license header * Fix naming of _total metric * Add more defensives
1 parent f3ef1f9 commit a80428f

File tree

11 files changed

+544
-12
lines changed

11 files changed

+544
-12
lines changed

management-api-agent-common/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@
6060
<version>${junit.version}</version>
6161
<scope>test</scope>
6262
</dependency>
63+
<dependency>
64+
<groupId>org.mockito</groupId>
65+
<artifactId>mockito-core</artifactId>
66+
<version>5.5.0</version>
67+
<scope>test</scope>
68+
</dependency>
6369
<dependency>
6470
<groupId>io.netty</groupId>
6571
<artifactId>netty-all</artifactId>

management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@
1010
import java.util.concurrent.CompletableFuture;
1111
import java.util.concurrent.ExecutorService;
1212
import java.util.concurrent.Executors;
13+
import java.util.concurrent.ThreadPoolExecutor;
1314
import org.apache.cassandra.utils.Pair;
1415

1516
public class JobExecutor {
1617
ExecutorService executorService = Executors.newFixedThreadPool(1);
17-
Cache<String, Job> jobCache = CacheBuilder.newBuilder().maximumSize(1000).build();
18+
Cache<String, Job> jobCache = CacheBuilder.newBuilder().recordStats().maximumSize(1000).build();
1819

1920
public Pair<String, CompletableFuture<Void>> submit(String jobType, Runnable runnable) {
2021
// Where do I create the job details? Here? Add it to the Cache first?
@@ -45,4 +46,12 @@ public Pair<String, CompletableFuture<Void>> submit(String jobType, Runnable run
4546
public Job getJobWithId(String jobId) {
4647
return jobCache.getIfPresent(jobId);
4748
}
49+
50+
public int runningTasks() {
51+
return ((ThreadPoolExecutor) executorService).getActiveCount();
52+
}
53+
54+
public int queuedTasks() {
55+
return ((ThreadPoolExecutor) executorService).getQueue().size();
56+
}
4857
}

management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ public CassandraMetricNameParser(
4242
}
4343
}
4444

45+
public static CassandraMetricNameParser getDefaultParser(Configuration config) {
46+
return new CassandraMetricNameParser(
47+
CassandraMetricsTools.DEFAULT_LABEL_NAMES,
48+
CassandraMetricsTools.DEFAULT_LABEL_VALUES,
49+
config);
50+
}
51+
4552
private void parseEnvVariablesAsLabels(Map<String, String> envSettings) {
4653
for (Map.Entry<String, String> entry : envSettings.entrySet()) {
4754
String envValue = System.getenv(entry.getValue());

management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,7 @@ public class CassandraMetricRegistryListener implements MetricRegistryListener {
7777
public CassandraMetricRegistryListener(
7878
ConcurrentHashMap<String, RefreshableMetricFamilySamples> familyCache, Configuration config)
7979
throws NoSuchMethodException {
80-
parser =
81-
new CassandraMetricNameParser(
82-
CassandraMetricsTools.DEFAULT_LABEL_NAMES,
83-
CassandraMetricsTools.DEFAULT_LABEL_VALUES,
84-
config);
80+
parser = CassandraMetricNameParser.getDefaultParser(config);
8581
cache = new ConcurrentHashMap<>();
8682

8783
this.familyCache = familyCache;

management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ public class Configuration {
2121
@JsonProperty("labels")
2222
private LabelConfiguration labels;
2323

24+
@JsonProperty("extended_metrics_disabled")
25+
private boolean extendedDisabled;
26+
2427
public Configuration() {
2528
relabels = new ArrayList<>();
2629
}
@@ -44,4 +47,12 @@ public void setRelabels(List<RelabelSpec> relabels) {
4447
public void setLabels(LabelConfiguration labels) {
4548
this.labels = labels;
4649
}
50+
51+
public boolean isExtendedDisabled() {
52+
return extendedDisabled;
53+
}
54+
55+
public void setExtendedDisabled(boolean extendedDisabled) {
56+
this.extendedDisabled = extendedDisabled;
57+
}
4758
}

management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.k8ssandra.metrics.config.Configuration;
1010
import io.k8ssandra.metrics.http.NettyMetricsHttpServer;
1111
import io.k8ssandra.metrics.prometheus.CassandraDropwizardExports;
12+
import io.k8ssandra.metrics.prometheus.CassandraTasksExports;
1213
import io.netty.channel.EventLoopGroup;
1314
import io.netty.channel.epoll.EpollEventLoopGroup;
1415
import io.prometheus.client.hotspot.DefaultExports;
@@ -72,6 +73,11 @@ public static void intercept(@SuperCall Callable<Void> zuper) throws Exception {
7273
// Add JVM metrics
7374
DefaultExports.initialize();
7475

76+
// Add task metrics
77+
if (!config.isExtendedDisabled()) {
78+
new CassandraTasksExports(CassandraMetricsRegistry.Metrics, config).register();
79+
}
80+
7581
// Create /metrics handler. Note, this doesn't support larger than nThreads=1
7682
final EventLoopGroup httpGroup = new EpollEventLoopGroup(1);
7783

@@ -81,12 +87,7 @@ public static void intercept(@SuperCall Callable<Void> zuper) throws Exception {
8187

8288
logger.info("Metrics collector started");
8389

84-
Runtime.getRuntime()
85-
.addShutdownHook(
86-
new Thread(
87-
() -> {
88-
httpGroup.shutdownGracefully();
89-
}));
90+
Runtime.getRuntime().addShutdownHook(new Thread(httpGroup::shutdownGracefully));
9091
} catch (Throwable t) {
9192
logger.error("Unable to start metrics endpoint", t);
9293
}

0 commit comments

Comments
 (0)