diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfigBuilder.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfigBuilder.java index a952cf542c..06fb6a02a9 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfigBuilder.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfigBuilder.java @@ -27,13 +27,12 @@ public class MqttPublishConfigBuilder extends ConfigBuilder params; + private List params; MqttPublishConfigBuilder( Function mapper) { this.mapper = mapper; - this.params = new ArrayList<>(); } @Override @@ -53,6 +52,10 @@ public MqttPublishConfigBuilder topic( public MqttPublishConfigBuilder param( MqttTopicParamConfig param) { + if (this.params == null) + { + this.params = new ArrayList<>(); + } this.params.add(param); return this; } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfigBuilder.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfigBuilder.java index a1b4124c14..d6aa8963f9 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfigBuilder.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfigBuilder.java @@ -27,13 +27,12 @@ public class MqttSubscribeConfigBuilder extends ConfigBuilder params; + private List params; MqttSubscribeConfigBuilder( Function mapper) { this.mapper = mapper; - this.params = new ArrayList<>(); } @Override @@ -53,6 +52,10 @@ public MqttSubscribeConfigBuilder topic( public MqttSubscribeConfigBuilder param( MqttTopicParamConfig param) { + if (this.params == null) + { + this.params = new ArrayList<>(); + } this.params.add(param); return this; } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttBindingConfig.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttBindingConfig.java index d577748cd0..02813e2018 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttBindingConfig.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttBindingConfig.java @@ -118,7 +118,7 @@ public MqttRouteConfig resolveSubscribe( String topic) { return routes.stream() - .filter(r -> r.authorized(authorization) && r.matchesSubscribe(topic, authorization)) + .filter(r -> r.authorized(authorization) && r.matchesSubscribe(authorization, topic)) .findFirst() .orElse(null); } @@ -128,7 +128,7 @@ public MqttRouteConfig resolvePublish( String topic) { return routes.stream() - .filter(r -> r.authorized(authorization) && r.matchesPublish(topic, authorization)) + .filter(r -> r.authorized(authorization) && r.matchesPublish(authorization, topic)) .findFirst() .orElse(null); } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java index f165079cb3..05e22c1bca 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java @@ -75,17 +75,14 @@ public JsonObject adaptToJson( { JsonArrayBuilder subscribes = Json.createArrayBuilder(); - mqttCondition.subscribes.forEach(s -> + mqttCondition.subscribes.forEach(sub -> { JsonObjectBuilder subscribeJson = Json.createObjectBuilder(); - subscribeJson.add(TOPIC_NAME, s.topic); - if (!s.params.isEmpty()) + subscribeJson.add(TOPIC_NAME, sub.topic); + if (sub.params != null && !sub.params.isEmpty()) { JsonObjectBuilder params = Json.createObjectBuilder(); - s.params.forEach(par -> - { - params.add(par.name, par.value); - }); + sub.params.forEach(p -> params.add(p.name, p.value)); subscribeJson.add(PARAMS_NAME, params); } subscribes.add(subscribeJson); @@ -97,17 +94,14 @@ public JsonObject adaptToJson( { JsonArrayBuilder publishes = Json.createArrayBuilder(); - mqttCondition.publishes.forEach(p -> + mqttCondition.publishes.forEach(pub -> { JsonObjectBuilder publishJson = Json.createObjectBuilder(); - publishJson.add(TOPIC_NAME, p.topic); - if (!p.params.isEmpty()) + publishJson.add(TOPIC_NAME, pub.topic); + if (pub.params != null && !pub.params.isEmpty()) { JsonObjectBuilder params = Json.createObjectBuilder(); - p.params.forEach(par -> - { - params.add(par.name, par.value); - }); + pub.params.forEach(p -> params.add(p.name, p.value)); publishJson.add(PARAMS_NAME, params); } publishes.add(publishJson); diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java index e0e2481a56..a8954264a2 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java @@ -18,16 +18,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import org.agrona.collections.ObjLongPredicate; +import org.agrona.collections.LongObjPredicate; import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig; import io.aklivity.zilla.runtime.binding.mqtt.config.MqttTopicParamConfig; -import io.aklivity.zilla.runtime.engine.config.GuardedConfig; public final class MqttConditionMatcher { @@ -35,12 +33,12 @@ public final class MqttConditionMatcher Pattern.compile("\\$\\{guarded(?:\\['([a-zA-Z]+[a-zA-Z0-9\\._\\:\\-]*)'\\]).identity\\}"); private final List sessionMatchers; - private final List> subscribeMatchPredicates; - private final List> publishMatchPredicates; + private final List> subscribeMatchPredicates; + private final List> publishMatchPredicates; public MqttConditionMatcher( - MqttConditionConfig condition, - List guarded) + MqttRouteConfig route, + MqttConditionConfig condition) { this.sessionMatchers = condition.sessions != null && !condition.sessions.isEmpty() ? @@ -49,11 +47,11 @@ public MqttConditionMatcher( Matcher identityMatcher = IDENTITY_PATTERN.matcher(""); this.subscribeMatchPredicates = condition.subscribes != null && !condition.subscribes.isEmpty() ? - condition.subscribes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, guarded, identityMatcher)) + condition.subscribes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, route, identityMatcher)) .collect(Collectors.toList()) : null; this.publishMatchPredicates = condition.publishes != null && !condition.publishes.isEmpty() ? - condition.publishes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, guarded, identityMatcher)) + condition.publishes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, route, identityMatcher)) .collect(Collectors.toList()) : null; } @@ -76,15 +74,15 @@ public boolean matchesSession( } public boolean matchesSubscribe( - String topic, - long authorization) + long authorization, + String topic) { boolean match = false; if (subscribeMatchPredicates != null) { - for (ObjLongPredicate predicate : subscribeMatchPredicates) + for (LongObjPredicate predicate : subscribeMatchPredicates) { - match = predicate.test(topic, authorization); + match = predicate.test(authorization, topic); if (match) { break; @@ -95,15 +93,15 @@ public boolean matchesSubscribe( } public boolean matchesPublish( - String topic, - long authorization) + long authorization, + String topic) { boolean match = false; if (publishMatchPredicates != null) { - for (ObjLongPredicate predicate : publishMatchPredicates) + for (LongObjPredicate predicate : publishMatchPredicates) { - match = predicate.test(topic, authorization); + match = predicate.test(authorization, topic); if (match) { break; @@ -132,10 +130,10 @@ private static List asWildcardMatcher( return matchers; } - private static ObjLongPredicate asTopicMatchPredicate( + private static LongObjPredicate asTopicMatchPredicate( String wildcard, List params, - List guarded, + MqttRouteConfig route, Matcher identityMatcher ) { @@ -147,50 +145,77 @@ private static ObjLongPredicate asTopicMatchPredicate( .replaceAll("\\{([a-zA-Z_]+)\\}", "(?<$1>[^/]+)")).matcher(""); Collection topicParams = topicMatcher.namedGroups().keySet(); - params.stream() - .filter(p -> !topicParams.contains(p.name)) - .forEach(p -> System.out.format("Undefined param constraint for MQTT topic %s: %s\n", wildcard, p.name)); topicParams.stream() - .filter(tp -> params.stream().noneMatch(p -> p.name.equals(tp))) + .filter(tp -> params == null || params.stream().noneMatch(p -> p.name.equals(tp))) .forEach(tp -> System.out.format("Unconstrained param for MQTT topic %s: %s\n", wildcard, tp)); - return params.isEmpty() - ? (topic, auth) -> topicMatcher.reset(topic).matches() - : (topic, auth) -> matchesWithParams(topic, params, topicMatcher, identityMatcher, auth, guarded); + LongObjPredicate topicMatchPredicate; + if (params == null || params.isEmpty()) + { + topicMatchPredicate = (auth, topic) -> topicMatcher.reset(topic).matches(); + } + else + { + List guardedIdentityParams = new ArrayList<>(); + List constantParams = new ArrayList<>(); + for (MqttTopicParamConfig param : params) + { + if (!topicParams.contains(param.name)) + { + System.out.format("Undefined param constraint for MQTT topic %s: %s\n", wildcard, param.name); + } + if (identityMatcher.reset(param.value).matches()) + { + String guard = identityMatcher.group(1); + guardedIdentityParams.add(new GuardedIdentityParam(param.name, guard)); + } + else + { + constantParams.add(param); + } + } + topicMatchPredicate = (auth, topic) -> + matchesWithParams(auth, topic, topicMatcher, route, guardedIdentityParams, constantParams); + } + + return topicMatchPredicate; } private static boolean matchesWithParams( + long authorization, String topic, - List params, Matcher topicMatcher, - Matcher identityMatcher, - long authorization, - List guarded) + MqttRouteConfig route, + List guardedIdentityParams, + List constantParams) { boolean match = topicMatcher.reset(topic).matches(); if (match) { - for (MqttTopicParamConfig param : params) + for (GuardedIdentityParam param : guardedIdentityParams) { - String value = param.value; - identityMatcher.reset(value); - if (identityMatcher.find()) + String identity = route.identity(param.guard, authorization); + try + { + match = topicMatcher.group(param.name).equals(identity); + } + catch (IllegalArgumentException e) { - String guardName = identityMatcher.group(1); - Optional identity = guarded.stream() - .filter(g -> guardName.equals(g.name)) - .findFirst() - .map(g -> g.identity.apply(authorization)); - if (identity.isEmpty()) - { - match = false; - break; - } - value = identityMatcher.replaceAll(identity.get()); + match = false; } + if (!match) + { + break; + } + } + } + if (match) + { + for (MqttTopicParamConfig param : constantParams) + { try { - match = topicMatcher.group(param.name).equals(value); + match = topicMatcher.group(param.name).equals(param.value); } catch (IllegalArgumentException e) { @@ -204,4 +229,10 @@ private static boolean matchesWithParams( } return match; } + + private record GuardedIdentityParam( + String name, + String guard) + { + } } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java index c77cdfaaee..2924ffbe3e 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java @@ -17,8 +17,11 @@ import static io.aklivity.zilla.runtime.engine.config.WithConfig.NO_COMPOSITE_ID; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; import java.util.List; +import java.util.Map; +import java.util.function.LongFunction; import java.util.function.LongPredicate; import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig; @@ -32,6 +35,7 @@ public final class MqttRouteConfig private final List when; private final MqttWithConfig with; private final LongPredicate authorized; + private final Map> identities; public MqttRouteConfig( RouteConfig route) @@ -39,10 +43,12 @@ public MqttRouteConfig( this.id = route.id; this.when = route.when.stream() .map(MqttConditionConfig.class::cast) - .map(conf -> new MqttConditionMatcher(conf, route.guarded)) + .map(c -> new MqttConditionMatcher(this, c)) .collect(toList()); this.with = (MqttWithConfig) route.with; this.authorized = route.authorized; + this.identities = route.guarded.stream() + .collect(toMap(g -> g.name, g -> g.identity)); } public long compositeId() @@ -56,6 +62,13 @@ boolean authorized( return authorized.test(authorization); } + String identity( + String guard, + long authorization) + { + return identities.getOrDefault(guard, a -> null).apply(authorization); + } + boolean matchesSession( String clientId) { @@ -63,16 +76,16 @@ boolean matchesSession( } boolean matchesSubscribe( - String topic, - long authorization) + long authorization, + String topic) { - return when.isEmpty() || when.stream().anyMatch(m -> m.matchesSubscribe(topic, authorization)); + return when.isEmpty() || when.stream().anyMatch(m -> m.matchesSubscribe(authorization, topic)); } boolean matchesPublish( - String topic, - long authorization) + long authorization, + String topic) { - return when.isEmpty() || when.stream().anyMatch(m -> m.matchesPublish(topic, authorization)); + return when.isEmpty() || when.stream().anyMatch(m -> m.matchesPublish(authorization, topic)); } } diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapterTest.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapterTest.java index a3b4cd8616..297e2f4acb 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapterTest.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapterTest.java @@ -144,9 +144,47 @@ public void shouldWriteCondition() String text = jsonb.toJson(condition); assertThat(text, not(nullValue())); - assertThat(text, equalTo("{\"session\":[{\"client-id\":\"client-1\"}],\"subscribe\":[{\"topic\":\"reply/one\"}," + - "{\"topic\":\"reply/two\"},{\"topic\":\"reply/{id}\",\"params\":{\"id\":\"${guarded['jwt'].identity}\"}}]," + - "\"publish\":[{\"topic\":\"command/one\"},{\"topic\":\"command/two\"},{\"topic\":\"command/{id}\",\"params\":" + - "{\"id\":\"${guarded['jwt'].identity}\"}}]}")); + assertThat(text, equalTo(""" + { + "session": + [ + { + "client-id":"client-1" + } + ], + "subscribe": + [ + { + "topic":"reply/one" + }, + { + "topic":"reply/two" + }, + { + "topic":"reply/{id}", + "params": + { + "id":"${guarded['jwt'].identity}" + } + } + ], + "publish": + [ + { + "topic":"command/one" + }, + { + "topic":"command/two" + }, + { + "topic":"command/{id}", + "params": + { + "id":"${guarded['jwt'].identity}" + } + } + ] + } + """.replaceAll("\\s*\\n\\s*", ""))); } } diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcherTest.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcherTest.java index 87b189ede0..2d49dd1e11 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcherTest.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcherTest.java @@ -18,7 +18,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.util.List; import java.util.Map; import org.junit.Test; @@ -28,6 +27,7 @@ import io.aklivity.zilla.runtime.binding.mqtt.config.MqttSubscribeConfig; import io.aklivity.zilla.runtime.binding.mqtt.config.MqttTopicParamConfig; import io.aklivity.zilla.runtime.engine.config.GuardedConfig; +import io.aklivity.zilla.runtime.engine.config.RouteConfig; public class MqttConditionMatcherTest { @@ -38,16 +38,16 @@ public void shouldMatchIsolatedMultiLevelWildcard() MqttConditionMatcher matcher = buildMatcher( "#", "#"); - assertTrue(matcher.matchesPublish("#", 1L)); - assertTrue(matcher.matchesSubscribe("#", 1L)); - assertTrue(matcher.matchesPublish("topic", 1L)); - assertTrue(matcher.matchesSubscribe("topic", 1L)); - assertTrue(matcher.matchesPublish("topic/pub", 1L)); - assertTrue(matcher.matchesSubscribe("topic/sub", 1L)); - assertTrue(matcher.matchesPublish("topic/+/pub", 1L)); - assertTrue(matcher.matchesSubscribe("topic/+/sub", 1L)); - assertTrue(matcher.matchesPublish("topic/pub/#", 1L)); - assertTrue(matcher.matchesSubscribe("topic/sub/#", 1L)); + assertTrue(matcher.matchesPublish(1L, "#")); + assertTrue(matcher.matchesSubscribe(1L, "#")); + assertTrue(matcher.matchesPublish(1L, "topic")); + assertTrue(matcher.matchesSubscribe(1L, "topic")); + assertTrue(matcher.matchesPublish(1L, "topic/pub")); + assertTrue(matcher.matchesSubscribe(1L, "topic/sub")); + assertTrue(matcher.matchesPublish(1L, "topic/+/pub")); + assertTrue(matcher.matchesSubscribe(1L, "topic/+/sub")); + assertTrue(matcher.matchesPublish(1L, "topic/pub/#")); + assertTrue(matcher.matchesSubscribe(1L, "topic/sub/#")); } @Test @@ -56,26 +56,26 @@ public void shouldMatchMultipleTopicNames() MqttConditionMatcher matcher = buildMatcher( "topic/pub", "topic/sub"); - assertTrue(matcher.matchesPublish("topic/pub", 1L)); - assertTrue(matcher.matchesSubscribe("topic/sub", 1L)); - assertFalse(matcher.matchesPublish("topic/#", 1L)); - assertFalse(matcher.matchesSubscribe("topic/#", 1L)); - assertFalse(matcher.matchesPublish("topic/+", 1L)); - assertFalse(matcher.matchesSubscribe("topic/+", 1L)); - assertFalse(matcher.matchesPublish("topic/sub", 1L)); - assertFalse(matcher.matchesSubscribe("topic/pub", 1L)); - assertFalse(matcher.matchesPublish("topic/pu", 1L)); - assertFalse(matcher.matchesSubscribe("topic/su", 1L)); - assertFalse(matcher.matchesPublish("topic/put", 1L)); - assertFalse(matcher.matchesSubscribe("topic/sup", 1L)); - assertFalse(matcher.matchesPublish("topic/publ", 1L)); - assertFalse(matcher.matchesSubscribe("topic/subs", 1L)); - assertFalse(matcher.matchesPublish("topicpub", 1L)); - assertFalse(matcher.matchesSubscribe("topicsub", 1L)); - assertFalse(matcher.matchesPublish("opic/pub", 1L)); - assertFalse(matcher.matchesSubscribe("opic/sub", 1L)); - assertFalse(matcher.matchesPublish("popic/pub", 1L)); - assertFalse(matcher.matchesSubscribe("zopic/sub", 1L)); + assertTrue(matcher.matchesPublish(1L, "topic/pub")); + assertTrue(matcher.matchesSubscribe(1L, "topic/sub")); + assertFalse(matcher.matchesPublish(1L, "topic/#")); + assertFalse(matcher.matchesSubscribe(1L, "topic/#")); + assertFalse(matcher.matchesPublish(1L, "topic/+")); + assertFalse(matcher.matchesSubscribe(1L, "topic/+")); + assertFalse(matcher.matchesPublish(1L, "topic/sub")); + assertFalse(matcher.matchesSubscribe(1L, "topic/pub")); + assertFalse(matcher.matchesPublish(1L, "topic/pu")); + assertFalse(matcher.matchesSubscribe(1L, "topic/su")); + assertFalse(matcher.matchesPublish(1L, "topic/put")); + assertFalse(matcher.matchesSubscribe(1L, "topic/sup")); + assertFalse(matcher.matchesPublish(1L, "topic/publ")); + assertFalse(matcher.matchesSubscribe(1L, "topic/subs")); + assertFalse(matcher.matchesPublish(1L, "topicpub")); + assertFalse(matcher.matchesSubscribe(1L, "topicsub")); + assertFalse(matcher.matchesPublish(1L, "opic/pub")); + assertFalse(matcher.matchesSubscribe(1L, "opic/sub")); + assertFalse(matcher.matchesPublish(1L, "popic/pub")); + assertFalse(matcher.matchesSubscribe(1L, "zopic/sub")); } @Test @@ -84,14 +84,14 @@ public void shouldMatchMultipleTopicNamesWithSingleLevelWildcard() MqttConditionMatcher matcher = buildMatcher( "topic/pub/+", "topic/sub/+"); - assertTrue(matcher.matchesPublish("topic/pub/aa", 1L)); - assertTrue(matcher.matchesSubscribe("topic/sub/bbb", 1L)); - assertTrue(matcher.matchesPublish("topic/pub/+", 1L)); - assertTrue(matcher.matchesSubscribe("topic/sub/+", 1L)); - assertFalse(matcher.matchesPublish("topic/sub/aa", 1L)); - assertFalse(matcher.matchesSubscribe("topic/pub/bbb", 1L)); - assertFalse(matcher.matchesPublish("topic/pub/aa/one", 1L)); - assertFalse(matcher.matchesSubscribe("topic/sub/bbb/two", 1L)); + assertTrue(matcher.matchesPublish(1L, "topic/pub/aa")); + assertTrue(matcher.matchesSubscribe(1L, "topic/sub/bbb")); + assertTrue(matcher.matchesPublish(1L, "topic/pub/+")); + assertTrue(matcher.matchesSubscribe(1L, "topic/sub/+")); + assertFalse(matcher.matchesPublish(1L, "topic/sub/aa")); + assertFalse(matcher.matchesSubscribe(1L, "topic/pub/bbb")); + assertFalse(matcher.matchesPublish(1L, "topic/pub/aa/one")); + assertFalse(matcher.matchesSubscribe(1L, "topic/sub/bbb/two")); } @Test @@ -100,12 +100,12 @@ public void shouldMatchMultipleTopicNamesWithSingleAndMultiLevelWildcard() MqttConditionMatcher matcher = buildMatcher( "topic/+/pub/#", "topic/+/sub/#"); - assertTrue(matcher.matchesPublish("topic/x/pub/aa", 1L)); - assertTrue(matcher.matchesSubscribe("topic/y/sub/b", 1L)); - assertTrue(matcher.matchesPublish("topic/x/pub/test/cc", 1L)); - assertTrue(matcher.matchesSubscribe("topic/y/sub/test/bb", 1L)); - assertFalse(matcher.matchesPublish("topic/pub/aa", 1L)); - assertFalse(matcher.matchesSubscribe("topic/sub/b", 1L)); + assertTrue(matcher.matchesPublish(1L, "topic/x/pub/aa")); + assertTrue(matcher.matchesSubscribe(1L, "topic/y/sub/b")); + assertTrue(matcher.matchesPublish(1L, "topic/x/pub/test/cc")); + assertTrue(matcher.matchesSubscribe(1L, "topic/y/sub/test/bb")); + assertFalse(matcher.matchesPublish(1L, "topic/pub/aa")); + assertFalse(matcher.matchesSubscribe(1L, "topic/sub/b")); } @Test @@ -120,24 +120,24 @@ public void shouldMatchTopicNameWithIdentityPlaceholder() Map.of( 1L, "myuser", 2L, "otheruser")); - assertTrue(matcher.matchesPublish("pub/myuser", 1L)); - assertTrue(matcher.matchesSubscribe("sub/myuser", 1L)); - assertTrue(matcher.matchesPublish("pub/otheruser", 2L)); - assertTrue(matcher.matchesSubscribe("sub/otheruser", 2L)); - assertFalse(matcher.matchesPublish("pub/myuser", 2L)); - assertFalse(matcher.matchesSubscribe("sub/myuser", 2L)); - assertFalse(matcher.matchesPublish("pub/otheruser", 1L)); - assertFalse(matcher.matchesSubscribe("sub/otheruser", 1L)); - assertFalse(matcher.matchesPublish("pub/myuset", 1L)); - assertFalse(matcher.matchesSubscribe("sub/myuset", 1L)); - assertFalse(matcher.matchesPublish("pub/myusert", 1L)); - assertFalse(matcher.matchesSubscribe("sub/myusert", 1L)); - assertFalse(matcher.matchesPublish("pub/myuser/a", 1L)); - assertFalse(matcher.matchesSubscribe("sub/myuser/a", 1L)); - assertFalse(matcher.matchesPublish("pub/myuser", 3L)); - assertFalse(matcher.matchesSubscribe("sub/myuser", 3L)); - assertFalse(matcher.matchesPublish("pub/null", 3L)); - assertFalse(matcher.matchesSubscribe("sub/null", 3L)); + assertTrue(matcher.matchesPublish(1L, "pub/myuser")); + assertTrue(matcher.matchesSubscribe(1L, "sub/myuser")); + assertTrue(matcher.matchesPublish(2L, "pub/otheruser")); + assertTrue(matcher.matchesSubscribe(2L, "sub/otheruser")); + assertFalse(matcher.matchesPublish(2L, "pub/myuser")); + assertFalse(matcher.matchesSubscribe(2L, "sub/myuser")); + assertFalse(matcher.matchesPublish(1L, "pub/otheruser")); + assertFalse(matcher.matchesSubscribe(1L, "sub/otheruser")); + assertFalse(matcher.matchesPublish(1L, "pub/myuset")); + assertFalse(matcher.matchesSubscribe(1L, "sub/myuset")); + assertFalse(matcher.matchesPublish(1L, "pub/myusert")); + assertFalse(matcher.matchesSubscribe(1L, "sub/myusert")); + assertFalse(matcher.matchesPublish(1L, "pub/myuser/a")); + assertFalse(matcher.matchesSubscribe(1L, "sub/myuser/a")); + assertFalse(matcher.matchesPublish(3L, "pub/myuser")); + assertFalse(matcher.matchesSubscribe(3L, "sub/myuser")); + assertFalse(matcher.matchesPublish(3L, "pub/null")); + assertFalse(matcher.matchesSubscribe(3L, "sub/null")); } @Test @@ -152,22 +152,22 @@ public void shouldMatchTopicNameWithIdentityPlaceholderAndMultiLevelWildcard() Map.of( 1L, "myuser", 2L, "otheruser")); - assertTrue(matcher.matchesPublish("pub/myuser/pubtest", 1L)); - assertTrue(matcher.matchesSubscribe("sub/myuser/subtest", 1L)); - assertTrue(matcher.matchesPublish("pub/myuser/pubtest/aaa", 1L)); - assertTrue(matcher.matchesSubscribe("sub/myuser/subtest/aa", 1L)); - assertTrue(matcher.matchesPublish("pub/otheruser/pubtest", 2L)); - assertTrue(matcher.matchesSubscribe("sub/otheruser/subtest", 2L)); - assertTrue(matcher.matchesPublish("pub/otheruser/pubtest/aa", 2L)); - assertTrue(matcher.matchesSubscribe("sub/otheruser/subtest/aa", 2L)); - assertFalse(matcher.matchesPublish("pub/myuser/pubtest", 2L)); - assertFalse(matcher.matchesSubscribe("sub/myuser/subtest", 2L)); - assertFalse(matcher.matchesPublish("pub/myuser/pubtest/aaa", 2L)); - assertFalse(matcher.matchesSubscribe("sub/myuser/subtest/aa", 2L)); - assertFalse(matcher.matchesPublish("pub/otheruser/pubtest", 1L)); - assertFalse(matcher.matchesSubscribe("sub/otheruser/subtest", 1L)); - assertFalse(matcher.matchesPublish("pub/otheruser/pubtest/aa", 1L)); - assertFalse(matcher.matchesSubscribe("sub/otheruser/subtest/aa", 1L)); + assertTrue(matcher.matchesPublish(1L, "pub/myuser/pubtest")); + assertTrue(matcher.matchesSubscribe(1L, "sub/myuser/subtest")); + assertTrue(matcher.matchesPublish(1L, "pub/myuser/pubtest/aaa")); + assertTrue(matcher.matchesSubscribe(1L, "sub/myuser/subtest/aa")); + assertTrue(matcher.matchesPublish(2L, "pub/otheruser/pubtest")); + assertTrue(matcher.matchesSubscribe(2L, "sub/otheruser/subtest")); + assertTrue(matcher.matchesPublish(2L, "pub/otheruser/pubtest/aa")); + assertTrue(matcher.matchesSubscribe(2L, "sub/otheruser/subtest/aa")); + assertFalse(matcher.matchesPublish(2L, "pub/myuser/pubtest")); + assertFalse(matcher.matchesSubscribe(2L, "sub/myuser/subtest")); + assertFalse(matcher.matchesPublish(2L, "pub/myuser/pubtest/aaa")); + assertFalse(matcher.matchesSubscribe(2L, "sub/myuser/subtest/aa")); + assertFalse(matcher.matchesPublish(1L, "pub/otheruser/pubtest")); + assertFalse(matcher.matchesSubscribe(1L, "sub/otheruser/subtest")); + assertFalse(matcher.matchesPublish(1L, "pub/otheruser/pubtest/aa")); + assertFalse(matcher.matchesSubscribe(1L, "sub/otheruser/subtest/aa")); } @Test @@ -182,12 +182,12 @@ public void shouldNotMatchTopicNameWithInvalidIdentityPlaceholder() Map.of( 1L, "myuser", 2L, "otheruser")); - assertFalse(matcher.matchesPublish("pub/{id}", 1L)); - assertFalse(matcher.matchesSubscribe("sub/{id}", 1L)); - assertFalse(matcher.matchesPublish("pub/myuser", 1L)); - assertFalse(matcher.matchesSubscribe("sub/myuser", 1L)); - assertFalse(matcher.matchesPublish("pub/otheruser", 2L)); - assertFalse(matcher.matchesSubscribe("sub/otheruser", 2L)); + assertFalse(matcher.matchesPublish(1L, "pub/{id}")); + assertFalse(matcher.matchesSubscribe(1L, "sub/{id}")); + assertFalse(matcher.matchesPublish(1L, "pub/myuser")); + assertFalse(matcher.matchesSubscribe(1L, "sub/myuser")); + assertFalse(matcher.matchesPublish(2L, "pub/otheruser")); + assertFalse(matcher.matchesSubscribe(2L, "sub/otheruser")); } @Test @@ -196,10 +196,10 @@ public void shouldMatchTopicNameWithUnconstrainedParam() MqttConditionMatcher matcher = buildMatcher( "pub/{id}", "sub/{id}"); - assertTrue(matcher.matchesPublish("pub/aaa", 1L)); - assertTrue(matcher.matchesSubscribe("sub/aaa", 1L)); - assertTrue(matcher.matchesPublish("pub/bbb", 2L)); - assertTrue(matcher.matchesSubscribe("sub/bbb", 2L)); + assertTrue(matcher.matchesPublish(1L, "pub/aaa")); + assertTrue(matcher.matchesSubscribe(1L, "sub/aaa")); + assertTrue(matcher.matchesPublish(2L, "pub/bbb")); + assertTrue(matcher.matchesSubscribe(2L, "sub/bbb")); } @Test @@ -214,10 +214,10 @@ public void shouldNotMatchTopicNameWithNonExistentParamConstraint() Map.of( 1L, "myuser", 2L, "otheruser")); - assertFalse(matcher.matchesPublish("pub/myuser", 1L)); - assertFalse(matcher.matchesSubscribe("sub/myuser", 1L)); - assertFalse(matcher.matchesPublish("pub/otheruser", 2L)); - assertFalse(matcher.matchesSubscribe("sub/otheruser", 2L)); + assertFalse(matcher.matchesPublish(1L, "pub/myuser")); + assertFalse(matcher.matchesSubscribe(1L, "sub/myuser")); + assertFalse(matcher.matchesPublish(2L, "pub/otheruser")); + assertFalse(matcher.matchesSubscribe(2L, "sub/otheruser")); } private static MqttConditionMatcher buildMatcher( @@ -260,6 +260,9 @@ private static MqttConditionMatcher buildMatcher( .name(guardName) .build(); guarded.identity = identities::get; - return new MqttConditionMatcher(condition, List.of(guarded)); + var route = new MqttRouteConfig(RouteConfig.builder() + .guarded(guarded) + .build()); + return new MqttConditionMatcher(route, condition); } }