Skip to content

Commit

Permalink
fix: eclipse-uprotocol#7 fixed unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
r-vanooyen committed Aug 8, 2024
1 parent ffe292c commit 846d430
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 82 deletions.
31 changes: 17 additions & 14 deletions src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,19 @@

class HiveMqMQTT5Client implements UTransport {

private static final Logger LOG = LoggerFactory.getLogger(HiveMqMQTT5Client.class);
private static final String USER_PROPERTIES_KEY_FOR_ID = "1";
private static final String USER_PROPERTIES_KEY_FOR_MESSAGE_TYPE = "2";
private static final String USER_PROPERTIES_KEY_FOR_SOURCE_NAME = "3";
private static final String USER_PROPERTIES_KEY_FOR_SINK_NAME = "4";
private static final String USER_PROPERTIES_KEY_FOR_PRIORITY = "5";
private static final String USER_PROPERTIES_KEY_FOR_TTL = "6";
private static final String USER_PROPERTIES_KEY_FOR_PERMISSION_LEVEL = "7";
private static final String USER_PROPERTIES_KEY_FOR_COMMSTATUS = "8";
private static final String USER_PROPERTIES_KEY_FOR_REQID = "9";
private static final String USER_PROPERTIES_KEY_FOR_TOKEN = "10";
private static final String USER_PROPERTIES_KEY_FOR_TRACEPARENT = "11";
private static final String USER_PROPERTIES_KEY_FOR_PAYLOAD_FORMAT = "12";
public static final Logger LOG = LoggerFactory.getLogger(HiveMqMQTT5Client.class);
public static final String USER_PROPERTIES_KEY_FOR_ID = "1";
public static final String USER_PROPERTIES_KEY_FOR_MESSAGE_TYPE = "2";
public static final String USER_PROPERTIES_KEY_FOR_SOURCE_NAME = "3";
public static final String USER_PROPERTIES_KEY_FOR_SINK_NAME = "4";
public static final String USER_PROPERTIES_KEY_FOR_PRIORITY = "5";
public static final String USER_PROPERTIES_KEY_FOR_TTL = "6";
public static final String USER_PROPERTIES_KEY_FOR_PERMISSION_LEVEL = "7";
public static final String USER_PROPERTIES_KEY_FOR_COMMSTATUS = "8";
public static final String USER_PROPERTIES_KEY_FOR_REQID = "9";
public static final String USER_PROPERTIES_KEY_FOR_TOKEN = "10";
public static final String USER_PROPERTIES_KEY_FOR_TRACEPARENT = "11";
public static final String USER_PROPERTIES_KEY_FOR_PAYLOAD_FORMAT = "12";
private final Mqtt5AsyncClient client;
private final UUri source;

Expand All @@ -69,18 +69,21 @@ public HiveMqMQTT5Client(UUri source, Mqtt5Client client) {
@Override
public CompletionStage<UStatus> send(UMessage uMessage) {
LOG.trace("should send a message:\n{}", uMessage);
CompletableFuture<UStatus> result = new CompletableFuture<>();

UAttributesValidator validator = UAttributesValidator.getValidator(uMessage.getAttributes());
ValidationResult validationResult = validator.validate(uMessage.getAttributes());
if (validationResult.isFailure()) {
throw new IllegalArgumentException("Invalid message attributes: " + validationResult);
}
if(uMessage.getAttributes().hasTtl() && uMessage.getAttributes().getTtl() < 500){
throw new IllegalArgumentException("TimeToLive needs to be at least 500ms. All smaller ttls will be dropped immediately by hiveMq");
}

Mqtt5UserProperties userProperties = buildUserProperties(uMessage.getAttributes());

Mqtt5PublishBuilder.Send.Complete<CompletableFuture<Mqtt5PublishResult>> sendHandle = buildMqttSendHandle(uMessage, userProperties);

CompletableFuture<UStatus> result = new CompletableFuture<>();
sendHandle
.send()
.whenCompleteAsync((mqtt5PublishResult, throwable) -> {
Expand Down
116 changes: 48 additions & 68 deletions src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import org.eclipse.uprotocol.communication.UPayload;
import org.eclipse.uprotocol.transport.UListener;
import org.eclipse.uprotocol.transport.UTransport;
import org.eclipse.uprotocol.transport.builder.UMessageBuilder;
import org.eclipse.uprotocol.uri.serializer.UriSerializer;
import org.eclipse.uprotocol.v1.*;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
Expand All @@ -37,6 +42,8 @@
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.uprotocol.mqtt.HiveMqMQTT5Client.USER_PROPERTIES_KEY_FOR_SINK_NAME;
import static org.eclipse.uprotocol.mqtt.HiveMqMQTT5Client.USER_PROPERTIES_KEY_FOR_SOURCE_NAME;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -78,43 +85,27 @@ void setUp() {

@Test
void givenValidClientAndMessage_whenInvokeSend_shouldSendCorrectMessageToMqtt() throws InterruptedException {
UMessage message = UMessage.newBuilder()
.setPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()))
.setAttributes(UAttributes.newBuilder()
.setId(UUID.newBuilder().build())
.setTtl(1000)
.setReqid(UUID.newBuilder().build())
.setToken("SomeToken")
.setTraceparent("someTraceParent")
.setSource(UUri.newBuilder()
.setAuthorityName("testSource.someUri.network")
.build())
.setSink(UUri.newBuilder()
.setAuthorityName("testDestination.someUri.network")
.build())
.build())
.build();
UMessage message = UMessageBuilder.request(
UUri.newBuilder().setAuthorityName("testSource.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(0).build(),
UUri.newBuilder().setAuthorityName("testDestination.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(1).build(), 500)
.withToken("SomeToken")
.withTraceparent("someTraceParent")
.build(new UPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()), UPayloadFormat.UPAYLOAD_FORMAT_TEXT));

UStatus response = serviceUnderTest.send(message).toCompletableFuture().join();
assertThat(response.getCode()).isEqualTo(UCode.OK);
Mqtt5Publish receive = handleToReceiveMqttMessages.receive(1, TimeUnit.SECONDS).get();

Mqtt5Publish receive = handleToReceiveMqttMessages.receive(1, TimeUnit.SECONDS).orElseThrow();
assertThat(new String(receive.getPayloadAsBytes())).isEqualTo("Hello World");
}

@Test
void givenValidClientAndSmallestMessage_whenInvokeSend_shouldSendCorrectMessageToMqtt() throws InterruptedException {
UMessage message = UMessage.newBuilder()
.setPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()))
.setAttributes(UAttributes.newBuilder()
.setId(UUID.newBuilder().build())
.setSource(UUri.newBuilder()
.setAuthorityName("testSource.someUri.network")
.build())
.setSink(UUri.newBuilder()
.setAuthorityName("testDestination.someUri.network")
.build())
.build())
.build();
UMessage message = UMessageBuilder.request(
UUri.newBuilder().setAuthorityName("testSource.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(0).build(),
UUri.newBuilder().setAuthorityName("testDestination.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(1).build(), 500)
.build(new UPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()), UPayloadFormat.UPAYLOAD_FORMAT_TEXT));

UStatus response = serviceUnderTest.send(message).toCompletableFuture().join();
assertThat(response.getCode()).isEqualTo(UCode.OK);
Mqtt5Publish receive = handleToReceiveMqttMessages.receive(1, TimeUnit.SECONDS).get();
Expand All @@ -124,15 +115,11 @@ void givenValidClientAndSmallestMessage_whenInvokeSend_shouldSendCorrectMessageT
@Test
@Disabled("Broadcast topic is not defined")
void givenValidClientAndBroadcastMessage_whenInvokeSend_shouldSendCorrectMessageToMqtt() throws InterruptedException {
UMessage message = UMessage.newBuilder()
.setPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()))
.setAttributes(UAttributes.newBuilder()
.setId(UUID.newBuilder().build())
.setSource(UUri.newBuilder()
.setAuthorityName("testSource.someUri.network")
.build())
.build())
.build();
UMessage message = UMessageBuilder.publish(
UUri.newBuilder().setAuthorityName("testSource.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(0).build())
.build(new UPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()), UPayloadFormat.UPAYLOAD_FORMAT_TEXT));


UStatus response = serviceUnderTest.send(message).toCompletableFuture().join();
assertThat(response.getCode()).isEqualTo(UCode.OK);
Mqtt5Publish receive = handleToReceiveMqttMessages.receive(1, TimeUnit.SECONDS).get();
Expand All @@ -145,15 +132,21 @@ void givenBlancoListener_whenAddingListenerAndReceivingMessages_shouldCallListen

UStatus status = serviceUnderTest.registerListener(null, listener).toCompletableFuture().join();

mqttClientForTests.publishWith().topic("a/some-source/c/d/e/some-sink/a/b/c").payload("Hello World".getBytes(Charset.defaultCharset())).send();
mqttClientForTests.publishWith().topic("a/some-source/c/d/e/some-sink/a/b/c")
.userProperties(Mqtt5UserProperties.of(
Mqtt5UserProperty.of(USER_PROPERTIES_KEY_FOR_SOURCE_NAME, UriSerializer.serialize(UUri.newBuilder().setAuthorityName("testSource.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(0).build())),
Mqtt5UserProperty.of(USER_PROPERTIES_KEY_FOR_SINK_NAME, UriSerializer.serialize(UUri.newBuilder().setAuthorityName("testDestination.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(1).build()))
))
.payload("Hello World".getBytes(Charset.defaultCharset()))
.send();

assertThat(status.getCode()).isEqualTo(UCode.OK);

ArgumentCaptor<UMessage> captor = ArgumentCaptor.captor();
verify(listener, Mockito.timeout(1000).times(1)).onReceive(captor.capture());
assertThat(captor.getValue()).isNotNull();
assertThat(captor.getValue().getAttributes().getSink().getAuthorityName()).isEqualTo("some-sink");
assertThat(captor.getValue().getAttributes().getSource().getAuthorityName()).isEqualTo("some-source");
assertThat(captor.getValue().getAttributes().getSink().getAuthorityName()).isEqualTo("testDestination.someUri.network");
assertThat(captor.getValue().getAttributes().getSource().getAuthorityName()).isEqualTo("testSource.someUri.network");
assertThat(captor.getValue().getPayload()).isNotNull();
assertThat(captor.getValue().getPayload().toString(Charset.defaultCharset())).isEqualTo("Hello World");
}
Expand Down Expand Up @@ -332,14 +325,14 @@ void given2Listeners_whenUnregisterOneListener_shouldInvokeOtherListenersOnMessa
}

@Test
void givenListener_whenReceivingUMessageWithAllFields_shouldRouteAllFieldsToListener() {
void givenListener_whenCloudSendsMessageToRadioAndRadioListens_shouldRouteMessageToRadio() {
//given a radio and a cloudService
UListener radioListener = mock(UListener.class);
UUri radioUuid = UUri.newBuilder()
.setAuthorityName("radio")
.setUeId(0xffff)
.setUeVersionMajor(0xff)
.setResourceId(0xffff)
.setResourceId(0)
.build();
UTransport mqttClientOfRadio = TransportFactory.createInstance(radioUuid, mqttClientForTests);

Expand All @@ -351,30 +344,19 @@ void givenListener_whenReceivingUMessageWithAllFields_shouldRouteAllFieldsToList
.build();
UTransport mqttClientOfCloud = TransportFactory.createInstance(cloudService, mqttClientForTests);

mqttClientOfRadio.registerListener(
cloudService,
radioUuid,
radioListener);
mqttClientOfRadio.registerListener(cloudService, radioUuid, radioListener);

//when cloud service sends a message
mqttClientOfCloud.send(
UMessage.newBuilder()
.setPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()))
.setAttributes(UAttributes.newBuilder()
.setId(UUID.newBuilder().setMsb(123L).build())
.setType(UMessageType.UMESSAGE_TYPE_NOTIFICATION)
.setSource(cloudService)
.setSink(radioUuid)
.setPriority(UPriority.UPRIORITY_CS0)
.setTtl(1000)
.setPermissionLevel(4211)
.setCommstatus(UCode.OK)
.setReqid(UUID.newBuilder().setMsb(456L).build())
.setToken("SomeToken")
.setTraceparent("someTraceParent")
.setPayloadFormat(UPayloadFormat.UPAYLOAD_FORMAT_TEXT)
.build())
.build());

UMessage message = UMessageBuilder.notification(cloudService, radioUuid)
.withPriority(UPriority.UPRIORITY_CS2)
.withTtl(1000)
.withPermissionLevel(4211)
.withToken("SomeToken")
.withTraceparent("someTraceParent")
.build(new UPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()), UPayloadFormat.UPAYLOAD_FORMAT_TEXT));

mqttClientOfCloud.send(message);

//should be received by radio
ArgumentCaptor<UMessage> captor = ArgumentCaptor.captor();
Expand All @@ -383,15 +365,13 @@ void givenListener_whenReceivingUMessageWithAllFields_shouldRouteAllFieldsToList
UMessage receivedMessage = captor.getValue();

assertThat(receivedMessage.getPayload().toString(Charset.defaultCharset())).isEqualTo("Hello World");
assertThat(receivedMessage.getAttributes().getId().getMsb()).isEqualTo(123L);
assertThat(receivedMessage.getAttributes().getType()).isEqualTo(UMessageType.UMESSAGE_TYPE_NOTIFICATION);
assertThat(receivedMessage.getAttributes().getSource().getAuthorityName()).isEqualTo("cloud");
assertThat(receivedMessage.getAttributes().getSink().getAuthorityName()).isEqualTo("radio");
assertThat(receivedMessage.getAttributes().getPriority()).isEqualTo(UPriority.UPRIORITY_CS0);
assertThat(receivedMessage.getAttributes().getPriority()).isEqualTo(UPriority.UPRIORITY_CS2);
assertThat(receivedMessage.getAttributes().getTtl()).isEqualTo(1000);
assertThat(receivedMessage.getAttributes().getPermissionLevel()).isEqualTo(4211);
assertThat(receivedMessage.getAttributes().getCommstatus()).isEqualTo(UCode.OK);
assertThat(receivedMessage.getAttributes().getReqid()).isEqualTo(UUID.newBuilder().setMsb(456L).build());
assertThat(receivedMessage.getAttributes().getToken()).isEqualTo("SomeToken");
assertThat(receivedMessage.getAttributes().getTraceparent()).isEqualTo("someTraceParent");
assertThat(receivedMessage.getAttributes().getPayloadFormat()).isEqualTo(UPayloadFormat.UPAYLOAD_FORMAT_TEXT);
Expand Down

0 comments on commit 846d430

Please sign in to comment.