Skip to content

Commit

Permalink
Merge pull request #260 from hivemq/fix/17211/metrics-cascade
Browse files Browse the repository at this point in the history
fix(17211) - Cascase bridge and adapter metrics on delete / stop
  • Loading branch information
simon622 authored Feb 28, 2024
2 parents 9e17256 + b9c4786 commit 04f4347
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
import com.codahale.metrics.MetricRegistry;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.metrics.HiveMQMetrics;
import dagger.internal.Preconditions;
import javassist.convert.TransformNew;

import javax.sound.midi.VoiceStatus;
import java.util.HashSet;
import java.util.Set;

public class PerBridgeMetrics {

public static final String BRIDGE_PREFIX = HiveMQMetrics.HIVEMQ_PREFIX + "bridge";

private final @NotNull Counter publishForwardSuccessCounter;
private final @NotNull Counter publishForwardFailCounter;
private final @NotNull Counter publishRemoteReceivedCounter;
Expand All @@ -34,54 +39,68 @@ public class PerBridgeMetrics {
private final @NotNull Counter remotePublishExcludedCounter;
private final @NotNull Counter loopPreventionForwardDropCounter;
private final @NotNull Counter loopPreventionRemoteDropCounter;
private final @NotNull Set<String> metricNames = new HashSet<>();
private final @NotNull Object mutex = new Object();

public PerBridgeMetrics(final @NotNull String bridgeName, final @NotNull MetricRegistry metricRegistry) {

publishForwardSuccessCounter =
metricRegistry.counter(MetricRegistry.name(BRIDGE_PREFIX, bridgeName, "forward.publish", "count"));
publishForwardSuccessCounter = createBridgeCounter(metricRegistry,
bridgeName,
"forward.publish",
"count");

publishForwardFailCounter = metricRegistry.counter(MetricRegistry.name(BRIDGE_PREFIX,
publishForwardFailCounter = createBridgeCounter(metricRegistry,
bridgeName,
"forward.publish.failed",
"count"));
"count");

publishRemoteReceivedCounter = metricRegistry.counter(MetricRegistry.name(BRIDGE_PREFIX,
publishRemoteReceivedCounter = createBridgeCounter(metricRegistry,
bridgeName,
"remote.publish.received",
"count"));
"count");

publishLocalReceivedCounter = metricRegistry.counter(MetricRegistry.name(BRIDGE_PREFIX,
publishLocalReceivedCounter = createBridgeCounter(metricRegistry,
bridgeName,
"local.publish.received",
"count"));
"count");

publishLocalSuccessCounter =
metricRegistry.counter(MetricRegistry.name(BRIDGE_PREFIX, bridgeName, "local.publish", "count"));
publishLocalSuccessCounter = createBridgeCounter(metricRegistry,
bridgeName,
"local.publish",
"count");

publishLocalNoSubscriberCounter = metricRegistry.counter(MetricRegistry.name(BRIDGE_PREFIX,
publishLocalNoSubscriberCounter = createBridgeCounter(metricRegistry,
bridgeName,
"local.publish.no-subscriber-present",
"count"));
"count");

publishLocalFailCounter = metricRegistry.counter(MetricRegistry.name(BRIDGE_PREFIX,
publishLocalFailCounter = createBridgeCounter(metricRegistry,
bridgeName,
"local.publish.failed",
"count"));
"count");

remotePublishExcludedCounter = metricRegistry.counter(MetricRegistry.name(BRIDGE_PREFIX,
remotePublishExcludedCounter = createBridgeCounter(metricRegistry,
bridgeName,
"forward.publish.excluded",
"count"));
"count");

loopPreventionForwardDropCounter = metricRegistry.counter(MetricRegistry.name(BRIDGE_PREFIX,
loopPreventionForwardDropCounter = createBridgeCounter(metricRegistry,
bridgeName,
"forward.publish.loop-hops-exceeded",
"count"));
"count");

loopPreventionRemoteDropCounter = metricRegistry.counter(MetricRegistry.name(BRIDGE_PREFIX,
loopPreventionRemoteDropCounter = createBridgeCounter(metricRegistry,
bridgeName,
"remote.publish.loop-hops-exceeded",
"count"));
"count");
}

private Counter createBridgeCounter(final @NotNull MetricRegistry metricRegistry, @NotNull final String... names){
final String metricName = MetricRegistry.name(BRIDGE_PREFIX, names);
synchronized (mutex){
metricNames.add(metricName);
}
return metricRegistry.counter(metricName);
}

public @NotNull Counter getPublishForwardSuccessCounter() {
Expand Down Expand Up @@ -123,4 +142,12 @@ public PerBridgeMetrics(final @NotNull String bridgeName, final @NotNull MetricR
public @NotNull Counter getLoopPreventionRemoteDropCounter() {
return loopPreventionRemoteDropCounter;
}

public void clearAll(final @NotNull MetricRegistry metricRegistry){
Preconditions.checkNotNull(metricRegistry);
synchronized (mutex){
metricNames.forEach(metricRegistry::remove);
metricNames.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.hivemq.edge.modules.api.events.model.Event;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.security.ssl.SslUtil;
import io.reactivex.internal.operators.completable.CompletableDoFinally;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -81,6 +82,7 @@ public class BridgeMqttClient {
private final @NotNull ListeningExecutorService executorService;
private final @NotNull PerBridgeMetrics perBridgeMetrics;
private final @NotNull EventService eventService;
private final @NotNull MetricRegistry metricRegistry;
private final AtomicBoolean connected = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final @NotNull List<MqttForwarder> forwarders = Collections.synchronizedList(new ArrayList<>());
Expand All @@ -98,6 +100,7 @@ public BridgeMqttClient(
this.bridge = bridge;
this.bridgeInterceptorHandler = bridgeInterceptorHandler;
this.eventService = eventService;
this.metricRegistry = metricRegistry;
this.mqtt5Client = createClient();
executorService = MoreExecutors.newDirectExecutorService();
perBridgeMetrics = new PerBridgeMetrics(bridge.getId(), metricRegistry);
Expand Down Expand Up @@ -282,8 +285,12 @@ private static List<Mqtt5Subscription> convertSubscriptions(
}

public void stop() {
stopped.set(true);
mqtt5Client.toAsync().disconnect();
try {
stopped.set(true);
mqtt5Client.toAsync().disconnect();
} finally {
perBridgeMetrics.clearAll(metricRegistry);
}
}

public @NotNull List<MqttForwarder> createForwarders() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ protected void setRuntimeStatus(@NotNull final RuntimeStatus runtimeStatus){
this.runtimeStatus.set(runtimeStatus);
}

@Override
public void destroy() {
protocolAdapterMetricsHelper.clearAll();
}

protected boolean running(){
return runtimeStatus.get() == RuntimeStatus.STARTED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import com.hivemq.bootstrap.LoggingBootstrap;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.metrics.HiveMQMetrics;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Set;
import java.util.logging.Logger;

/**
* Ensures the adapters use consistent namespaces for the metrics so they can be derived
Expand All @@ -32,6 +38,8 @@ public class ProtocolAdapterMetricsHelper {
private @NotNull String protocolAdapterType;
private @NotNull String protocolAdapterId;
private @NotNull MetricRegistry metricRegistry;
private final @NotNull Set<String> metricNames = new HashSet<>();
private final Object mutex = new Object();
static final String SUCCESS_COUNT = "success.count";
static final String FAILED_COUNT = "failed.count";
static final String PERIOD = ".";
Expand All @@ -54,10 +62,10 @@ public ProtocolAdapterMetricsHelper(final @NotNull String protocolAdapterType,
}

protected void initRegistry(){
publishSuccessCounter = metricRegistry.counter(createAdapterMetricsNamespace("read.publish", true) + SUCCESS_COUNT);
publishFailedCounter = metricRegistry.counter(createAdapterMetricsNamespace("read.publish", true) + FAILED_COUNT);
connectionSuccessCounter = metricRegistry.counter(createAdapterMetricsNamespace("connection", true) + SUCCESS_COUNT);
connectionFailedCounter = metricRegistry.counter(createAdapterMetricsNamespace("connection", true) + FAILED_COUNT);
publishSuccessCounter = metricRegistry.counter(createAdapterMetricsNamespace("read.publish." + SUCCESS_COUNT));
publishFailedCounter = metricRegistry.counter(createAdapterMetricsNamespace("read.publish." + FAILED_COUNT) );
connectionSuccessCounter = metricRegistry.counter(createAdapterMetricsNamespace("connection." + SUCCESS_COUNT));
connectionFailedCounter = metricRegistry.counter(createAdapterMetricsNamespace("connection." + FAILED_COUNT));
}

/**
Expand Down Expand Up @@ -95,7 +103,19 @@ public void incrementConnectionSuccess(){
*/
public void increment(final @NotNull String metricName){
Preconditions.checkNotNull(metricName);
metricRegistry.counter(createAdapterMetricsNamespace(metricName, false)).inc();
metricRegistry.counter(createAdapterMetricsNamespace(metricName)).inc();
}

/**
* Will clear down all metrics in the registry created by this metrics helper.
* NB: metrics created outside the context of this helper will not be touched.
*/
public void clearAll(){
Preconditions.checkNotNull(metricRegistry);
synchronized (mutex){
metricNames.forEach(metricRegistry::remove);
metricNames.clear();
}
}

/**
Expand All @@ -104,20 +124,20 @@ public void increment(final @NotNull String metricName){
* Example format of the namespace:
* com.hivemq.edge.protocol-adapters.[test-type].[test-id].[suffix](.) with optional trailing period
* @param suffix - the suffix to append to the namespace
* @param trailingPeriod - should the namespace by suffixed with a trailing period
* @return a namespace string for use in the metrics registry
*/
protected String createAdapterMetricsNamespace(@NotNull final String suffix, final boolean trailingPeriod){
protected String createAdapterMetricsNamespace(@NotNull final String suffix){
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(HiveMQMetrics.PROTOCOL_ADAPTER_PREFIX);
stringBuilder.append(protocolAdapterType);
stringBuilder.append(PERIOD);
stringBuilder.append(protocolAdapterId);
stringBuilder.append(PERIOD);
stringBuilder.append(suffix);
if(trailingPeriod){
stringBuilder.append(PERIOD);
String metricName = stringBuilder.toString();
synchronized (mutex){
metricNames.add(metricName);
}
return stringBuilder.toString();
return metricName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ public interface ProtocolAdapter {

@Nullable String getErrorMessage();

/**
* Called by the framework when the instance will be discarded
*/
default void destroy() {

}


enum RuntimeStatus {
STARTED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ public boolean deleteAdapter(final String id) {
if (protocolAdapters.remove(id) != null) {
try {
synchronized(lock){
//ensure the instance releases any hard state
adapterInstance.get().getAdapter().destroy();
Map<String, Object> mainMap =
configurationService.protocolAdapterConfigurationService().getAllConfigs();
List<Map> adapterList = getAdapterListForType(adapterInstance.get().getAdapterInformation().getProtocolId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
*/
public class ProtocolMetricsHelperTest {

static final String ARBITRARY_METRIC = "arbitrary-metric";

@Test
void testMetricsAdapterWrapperUpdatesRegistry() {

Expand All @@ -38,8 +40,29 @@ void testMetricsAdapterWrapperUpdatesRegistry() {
assertEquals(1, registry.getCounters().get("com.hivemq.edge.protocol-adapters.test-adapter-name.test-adapter-id.read.publish.failed.count").getCount(), "Matching failed data point should be incremented");
assertEquals(1, registry.getCounters().get("com.hivemq.edge.protocol-adapters.test-adapter-name.test-adapter-id.read.publish.success.count").getCount(), "Matching success data point should be incremented");

helper.increment("arbitrary-metric");
helper.increment(ARBITRARY_METRIC);
assertEquals(1, registry.getCounters().get("com.hivemq.edge.protocol-adapters.test-adapter-name.test-adapter-id.arbitrary-metric").getCount(), "Matching arbitrary data point should be incremented");

}

@Test
void testTearDownMetrics() {

MetricRegistry registry = new MetricRegistry();

//adapter helper creates 4 metrics
ProtocolAdapterMetricsHelper helper1 =
new ProtocolAdapterMetricsHelper("tear-down-name1","test-adapter-id", registry);

//add an arbitrary fifth
registry.counter(ARBITRARY_METRIC).inc();

assertEquals(5, registry.getMetrics().size(), "Number of metrics should match");

helper1.clearAll();

assertEquals(1, registry.getMetrics().size(), "Number of metrics should match");
assertEquals(1, registry.getCounters().get(ARBITRARY_METRIC).getCount(), "Matching success data point should be incremented");

}
}
46 changes: 46 additions & 0 deletions hivemq-edge/src/test/java/com/hivemq/bridge/BridgeMetricsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.hivemq.bridge;

import com.codahale.metrics.MetricRegistry;
import com.hivemq.bridge.metrics.PerBridgeMetrics;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* @author Simon L Johnson
*/
public class BridgeMetricsTest {

static final String ARBITRARY_METRIC = "arbitrary-metric";
@Test
void testTearDownMetrics() {

MetricRegistry registry = new MetricRegistry();

//adapter helper creates 10 metrics
PerBridgeMetrics perBridgeMetrics = new PerBridgeMetrics("bridge-name", registry);

assertEquals(10, registry.getMetrics().size(), "Number of metrics should match");

PerBridgeMetrics perBridgeMetrics2 = new PerBridgeMetrics("bridge-name2", registry);

assertEquals(20, registry.getMetrics().size(), "Number of metrics should match");

//add an arbitrary fifth
registry.counter(ARBITRARY_METRIC).inc();

assertEquals(21, registry.getMetrics().size(), "Number of metrics should match");

perBridgeMetrics.clearAll(registry);

assertEquals(11, registry.getMetrics().size(), "Number of metrics should match");

perBridgeMetrics2.clearAll(registry);

assertEquals(1, registry.getMetrics().size(), "Number of metrics should match");

assertEquals(1, registry.getCounters().get(ARBITRARY_METRIC).getCount(), "Matching success data point should be incremented");

}

}

0 comments on commit 04f4347

Please sign in to comment.