diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfig.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfig.java index 2de4aab1b0..714bbac464 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfig.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfig.java @@ -17,12 +17,15 @@ import static java.util.function.Function.identity; +import java.util.List; import java.util.function.Function; public class MqttPublishConfig { public final String topic; + public final List params; + public static MqttPublishConfigBuilder builder() { return new MqttPublishConfigBuilder<>(identity()); @@ -35,8 +38,10 @@ public static MqttPublishConfigBuilder builder( } MqttPublishConfig( - String topic) + String topic, + List params) { this.topic = topic; + this.params = params; } } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfigBuilder.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfigBuilder.java index cf01fc1cfa..7e8f0395f9 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfigBuilder.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfigBuilder.java @@ -15,6 +15,8 @@ */ package io.aklivity.zilla.runtime.binding.mqtt.config; +import java.util.ArrayList; +import java.util.List; import java.util.function.Function; import io.aklivity.zilla.runtime.engine.config.ConfigBuilder; @@ -25,6 +27,8 @@ public class MqttPublishConfigBuilder extends ConfigBuilder params; + MqttPublishConfigBuilder( Function mapper) { @@ -45,9 +49,25 @@ public MqttPublishConfigBuilder topic( return this; } + public MqttPublishConfigBuilder param( + MqttTopicParamConfig param) + { + if (this.params == null) + { + this.params = new ArrayList<>(); + } + this.params.add(param); + return this; + } + + public MqttTopicParamConfigBuilder> param() + { + return new MqttTopicParamConfigBuilder<>(this::param); + } + @Override public T build() { - return mapper.apply(new MqttPublishConfig(topic)); + return mapper.apply(new MqttPublishConfig(topic, params)); } } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfig.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfig.java index 03f30ee8b2..8faefab05e 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfig.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfig.java @@ -17,12 +17,15 @@ import static java.util.function.Function.identity; +import java.util.List; import java.util.function.Function; public class MqttSubscribeConfig { public final String topic; + public final List params; + public static MqttSubscribeConfigBuilder builder() { return new MqttSubscribeConfigBuilder<>(identity()); @@ -35,8 +38,10 @@ public static MqttSubscribeConfigBuilder builder( } MqttSubscribeConfig( - String topic) + String topic, + List params) { this.topic = topic; + this.params = params; } } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfigBuilder.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfigBuilder.java index 494fa2ee7c..943a200bab 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfigBuilder.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfigBuilder.java @@ -15,6 +15,8 @@ */ package io.aklivity.zilla.runtime.binding.mqtt.config; +import java.util.ArrayList; +import java.util.List; import java.util.function.Function; import io.aklivity.zilla.runtime.engine.config.ConfigBuilder; @@ -25,6 +27,8 @@ public class MqttSubscribeConfigBuilder extends ConfigBuilder params; + MqttSubscribeConfigBuilder( Function mapper) { @@ -45,9 +49,25 @@ public MqttSubscribeConfigBuilder topic( return this; } + public MqttSubscribeConfigBuilder param( + MqttTopicParamConfig param) + { + if (this.params == null) + { + this.params = new ArrayList<>(); + } + this.params.add(param); + return this; + } + + public MqttTopicParamConfigBuilder> param() + { + return new MqttTopicParamConfigBuilder<>(this::param); + } + @Override public T build() { - return mapper.apply(new MqttSubscribeConfig(topic)); + return mapper.apply(new MqttSubscribeConfig(topic, params)); } } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttTopicParamConfig.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttTopicParamConfig.java new file mode 100644 index 0000000000..57cd8dd73c --- /dev/null +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttTopicParamConfig.java @@ -0,0 +1,46 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.mqtt.config; + +import static java.util.function.Function.identity; + +import java.util.function.Function; + +public class MqttTopicParamConfig +{ + public final String name; + + public final String value; + + public static MqttTopicParamConfigBuilder builder() + { + return new MqttTopicParamConfigBuilder<>(identity()); + } + + public static MqttTopicParamConfigBuilder builder( + Function mapper) + { + return new MqttTopicParamConfigBuilder<>(mapper); + } + + MqttTopicParamConfig( + String name, + String value) + { + this.name = name; + this.value = value; + } +} diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttTopicParamConfigBuilder.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttTopicParamConfigBuilder.java new file mode 100644 index 0000000000..cde1ae6dfa --- /dev/null +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttTopicParamConfigBuilder.java @@ -0,0 +1,62 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.mqtt.config; + +import java.util.function.Function; + +import io.aklivity.zilla.runtime.engine.config.ConfigBuilder; + +public class MqttTopicParamConfigBuilder extends ConfigBuilder> +{ + private final Function mapper; + + private String name; + + private String value; + + MqttTopicParamConfigBuilder( + Function mapper) + { + this.mapper = mapper; + } + + @Override + @SuppressWarnings("unchecked") + protected Class> thisType() + { + return (Class>) getClass(); + } + + public MqttTopicParamConfigBuilder name( + String name) + { + this.name = name; + return this; + } + + public MqttTopicParamConfigBuilder value( + String value) + { + this.value = value; + return this; + } + + @Override + public T build() + { + return mapper.apply(new MqttTopicParamConfig(name, value)); + } +} diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttBindingConfig.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttBindingConfig.java index f6dd199e19..02813e2018 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttBindingConfig.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttBindingConfig.java @@ -118,7 +118,7 @@ public MqttRouteConfig resolveSubscribe( String topic) { return routes.stream() - .filter(r -> r.authorized(authorization) && r.matchesSubscribe(topic)) + .filter(r -> r.authorized(authorization) && r.matchesSubscribe(authorization, topic)) .findFirst() .orElse(null); } @@ -128,7 +128,7 @@ public MqttRouteConfig resolvePublish( String topic) { return routes.stream() - .filter(r -> r.authorized(authorization) && r.matchesPublish(topic)) + .filter(r -> r.authorized(authorization) && r.matchesPublish(authorization, topic)) .findFirst() .orElse(null); } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java index 68aa3703f6..911b47c4c6 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java @@ -24,6 +24,8 @@ import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig; import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfigBuilder; +import io.aklivity.zilla.runtime.binding.mqtt.config.MqttPublishConfigBuilder; +import io.aklivity.zilla.runtime.binding.mqtt.config.MqttSubscribeConfigBuilder; import io.aklivity.zilla.runtime.binding.mqtt.internal.MqttBinding; import io.aklivity.zilla.runtime.engine.config.ConditionConfig; import io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi; @@ -35,6 +37,7 @@ public final class MqttConditionConfigAdapter implements ConditionConfigAdapterS private static final String PUBLISH_NAME = "publish"; private static final String CLIENT_ID_NAME = "client-id"; private static final String TOPIC_NAME = "topic"; + private static final String PARAMS_NAME = "params"; private static final String CLIENT_ID_DEFAULT = "*"; @@ -72,10 +75,16 @@ public JsonObject adaptToJson( { JsonArrayBuilder subscribes = Json.createArrayBuilder(); - mqttCondition.subscribes.forEach(s -> + mqttCondition.subscribes.forEach(sub -> { JsonObjectBuilder subscribeJson = Json.createObjectBuilder(); - subscribeJson.add(TOPIC_NAME, s.topic); + subscribeJson.add(TOPIC_NAME, sub.topic); + if (sub.params != null) + { + JsonObjectBuilder params = Json.createObjectBuilder(); + sub.params.forEach(p -> params.add(p.name, p.value)); + subscribeJson.add(PARAMS_NAME, params); + } subscribes.add(subscribeJson); }); object.add(SUBSCRIBE_NAME, subscribes); @@ -85,10 +94,16 @@ public JsonObject adaptToJson( { JsonArrayBuilder publishes = Json.createArrayBuilder(); - mqttCondition.publishes.forEach(p -> + mqttCondition.publishes.forEach(pub -> { JsonObjectBuilder publishJson = Json.createObjectBuilder(); - publishJson.add(TOPIC_NAME, p.topic); + publishJson.add(TOPIC_NAME, pub.topic); + if (pub.params != null) + { + JsonObjectBuilder params = Json.createObjectBuilder(); + pub.params.forEach(p -> params.add(p.name, p.value)); + publishJson.add(PARAMS_NAME, params); + } publishes.add(publishJson); }); object.add(PUBLISH_NAME, publishes); @@ -121,11 +136,24 @@ public ConditionConfig adaptFromJson( JsonArray subscribesJson = object.getJsonArray(SUBSCRIBE_NAME); subscribesJson.forEach(s -> { - String topic = s.asJsonObject().getString(TOPIC_NAME); + JsonObject subscribeJson = s.asJsonObject(); - mqttConfig.subscribe() - .topic(topic) - .build(); + MqttSubscribeConfigBuilder subscribe = mqttConfig + .subscribe() + .topic(subscribeJson.getString(TOPIC_NAME)); + + if (subscribeJson.containsKey(PARAMS_NAME)) + { + JsonObject paramsJson = subscribeJson.getJsonObject(PARAMS_NAME); + + paramsJson.keySet().forEach(n -> + subscribe.param() + .name(n) + .value(paramsJson.getString(n)) + .build()); + } + + subscribe.build(); }); } @@ -134,11 +162,24 @@ public ConditionConfig adaptFromJson( JsonArray publishesJson = object.getJsonArray(PUBLISH_NAME); publishesJson.forEach(p -> { - String topic = p.asJsonObject().getString(TOPIC_NAME); + JsonObject publishJson = p.asJsonObject(); - mqttConfig.publish() - .topic(topic) - .build(); + MqttPublishConfigBuilder publish = mqttConfig + .publish() + .topic(publishJson.getString(TOPIC_NAME)); + + if (publishJson.containsKey(PARAMS_NAME)) + { + JsonObject paramsJson = publishJson.getJsonObject(PARAMS_NAME); + + paramsJson.keySet().forEach(n -> + publish.param() + .name(n) + .value(paramsJson.getString(n)) + .build()); + } + + publish.build(); }); } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java index 7d524e2b34..b5be991df9 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java @@ -16,31 +16,44 @@ package io.aklivity.zilla.runtime.binding.mqtt.internal.config; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.LongFunction; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.agrona.collections.LongObjPredicate; + import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig; +import io.aklivity.zilla.runtime.binding.mqtt.config.MqttTopicParamConfig; public final class MqttConditionMatcher { + private static final Pattern IDENTITY_PATTERN = + Pattern.compile("\\$\\{guarded(?:\\['([a-zA-Z]+[a-zA-Z0-9\\._\\:\\-]*)'\\]).identity\\}"); + + private final Matcher identityMatcher = IDENTITY_PATTERN.matcher(""); + private final List sessionMatchers; - private final List subscribeMatchers; - private final List publishMatchers; + private final List subscribeMatchers; + private final List publishMatchers; public MqttConditionMatcher( + MqttRouteConfig route, MqttConditionConfig condition) { - this.sessionMatchers = - condition.sessions != null && !condition.sessions.isEmpty() ? - asWildcardMatcher(condition.sessions.stream().map(s -> s.clientId).collect(Collectors.toList())) : null; - this.subscribeMatchers = - condition.subscribes != null && !condition.subscribes.isEmpty() ? - asTopicMatcher(condition.subscribes.stream().map(s -> s.topic).collect(Collectors.toList())) : null; - this.publishMatchers = - condition.publishes != null && !condition.publishes.isEmpty() ? - asTopicMatcher(condition.publishes.stream().map(s -> s.topic).collect(Collectors.toList())) : null; + this.sessionMatchers = condition.sessions != null && !condition.sessions.isEmpty() + ? asWildcardMatcher(condition.sessions.stream().map(s -> s.clientId).collect(Collectors.toList())) + : null; + this.subscribeMatchers = condition.subscribes != null && !condition.subscribes.isEmpty() + ? condition.subscribes.stream().map(s -> new TopicMatcher(route::identity, s.topic, s.params)).toList() + : null; + this.publishMatchers = condition.publishes != null && !condition.publishes.isEmpty() + ? condition.publishes.stream().map(p -> new TopicMatcher(route::identity, p.topic, p.params)).toList() + : null; } public boolean matchesSession( @@ -62,14 +75,15 @@ public boolean matchesSession( } public boolean matchesSubscribe( + long authorization, String topic) { boolean match = false; if (subscribeMatchers != null) { - for (Matcher matcher : subscribeMatchers) + for (TopicMatcher matcher : subscribeMatchers) { - match = matcher.reset(topic).matches(); + match = matcher.matches(authorization, topic); if (match) { break; @@ -80,14 +94,15 @@ public boolean matchesSubscribe( } public boolean matchesPublish( + long authorization, String topic) { boolean match = false; if (publishMatchers != null) { - for (Matcher matcher : publishMatchers) + for (TopicMatcher matcher : publishMatchers) { - match = matcher.reset(topic).matches(); + match = matcher.matches(authorization, topic); if (match) { break; @@ -116,19 +131,86 @@ private static List asWildcardMatcher( return matchers; } - private static List asTopicMatcher( - List wildcards) + private final class TopicMatcher { - List matchers = new ArrayList<>(); - for (String wildcard : wildcards) + private final Matcher matchTopic; + private final Map> matchParams; + + private TopicMatcher( + Function> identities, + String wildcard, + List params) { - matchers.add(Pattern.compile(wildcard + this.matchTopic = Pattern.compile(wildcard .replace(".", "\\.") .replace("$", "\\$") .replace("+", "[^/]*") - .replace("#", ".*")).matcher("")); + .replace("#", ".*") + .replaceAll("\\{([a-zA-Z_]+)\\}", "(?<$1>[^/]+)")).matcher(""); + this.matchParams = params != null + ? params.stream().collect(Collectors.toMap( + p -> p.name, + p -> asTopicParamMatcher(identities, p.value))) + : null; + Collection topicParams = matchTopic.namedGroups().keySet(); + topicParams.stream() + .filter(tp -> params == null || params.stream().noneMatch(p -> p.name.equals(tp))) + .forEach(tp -> System.out.format("Unconstrained param for MQTT topic %s: %s\n", wildcard, tp)); + if (params != null) + { + params.stream() + .filter(p -> !topicParams.contains(p.name)) + .forEach(p -> System.out.printf("Undefined param constraint for MQTT topic %s: %s\n", wildcard, p.name)); + } + } + + private boolean matches( + long authorization, + String topic) + { + return matchTopic.reset(topic).matches() && + matchParams(name -> + { + try + { + return matchTopic.group(name); + } + catch (IllegalArgumentException e) + { + return null; + } + }, authorization); + } + + private boolean matchParams( + Function valuesByName, + long authorization) + { + return matchParams == null || + matchParams.entrySet().stream() + .allMatch(e -> e.getValue().test(authorization, valuesByName.apply(e.getKey()))); + } + + private LongObjPredicate asTopicParamMatcher( + Function> identities, + String value) + { + return (identityMatcher.reset(value).matches()) + ? asTopicParamIdentityMatcher(identities.apply(identityMatcher.group(1))) + : asTopicParamValueMatcher(value); + } + + private static LongObjPredicate asTopicParamIdentityMatcher( + LongFunction identity) + { + return (a, v) -> v != null && identity != null && v.equals(identity.apply(a)); + } + + private static LongObjPredicate asTopicParamValueMatcher( + String expected) + { + return (a, v) -> v != null && v.equals(expected); } - return matchers; } } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java index 983760993f..d01220c041 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java @@ -17,8 +17,11 @@ import static io.aklivity.zilla.runtime.engine.config.WithConfig.NO_COMPOSITE_ID; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; import java.util.List; +import java.util.Map; +import java.util.function.LongFunction; import java.util.function.LongPredicate; import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig; @@ -32,17 +35,20 @@ public final class MqttRouteConfig private final List when; private final MqttWithConfig with; private final LongPredicate authorized; + private final Map> identities; public MqttRouteConfig( RouteConfig route) { this.id = route.id; + this.with = (MqttWithConfig) route.with; + this.authorized = route.authorized; + this.identities = route.guarded.stream() + .collect(toMap(g -> g.name, g -> g.identity)); this.when = route.when.stream() .map(MqttConditionConfig.class::cast) - .map(MqttConditionMatcher::new) + .map(c -> new MqttConditionMatcher(this, c)) .collect(toList()); - this.with = (MqttWithConfig) route.with; - this.authorized = route.authorized; } public long compositeId() @@ -56,6 +62,12 @@ boolean authorized( return authorized.test(authorization); } + LongFunction identity( + String guard) + { + return identities.get(guard); + } + boolean matchesSession( String clientId) { @@ -63,14 +75,16 @@ boolean matchesSession( } boolean matchesSubscribe( + long authorization, String topic) { - return when.isEmpty() || when.stream().anyMatch(m -> m.matchesSubscribe(topic)); + return when.isEmpty() || when.stream().anyMatch(m -> m.matchesSubscribe(authorization, topic)); } boolean matchesPublish( + long authorization, String topic) { - return when.isEmpty() || when.stream().anyMatch(m -> m.matchesPublish(topic)); + return when.isEmpty() || when.stream().anyMatch(m -> m.matchesPublish(authorization, topic)); } } diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapterTest.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapterTest.java index d4f2aa4e36..297e2f4acb 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapterTest.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapterTest.java @@ -29,6 +29,7 @@ import org.junit.Test; import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig; +import io.aklivity.zilla.runtime.binding.mqtt.config.MqttTopicParamConfig; public class MqttConditionConfigAdapterTest { @@ -61,6 +62,12 @@ public void shouldReadCondition() "{" + "\"topic\": \"reply/two\"" + "}," + + "{" + + "\"topic\": \"reply/{id}\"," + + "\"params\": {" + + "\"id\": \"${guarded['jwt'].identity}\"" + + "}" + + "}," + "]," + "\"publish\":" + "[" + @@ -69,7 +76,13 @@ public void shouldReadCondition() "}," + "{" + "\"topic\": \"command/two\"" + - "}" + + "}," + + "{" + + "\"topic\": \"command/{id}\"," + + "\"params\": {" + + "\"id\": \"${guarded['jwt'].identity}\"" + + "}" + + "}," + "]" + "}"; @@ -81,9 +94,15 @@ public void shouldReadCondition() assertThat(condition.subscribes, not(nullValue())); assertThat(condition.subscribes.get(0).topic, equalTo("reply/one")); assertThat(condition.subscribes.get(1).topic, equalTo("reply/two")); + assertThat(condition.subscribes.get(2).topic, equalTo("reply/{id}")); + assertThat(condition.subscribes.get(2).params.get(0).name, equalTo("id")); + assertThat(condition.subscribes.get(2).params.get(0).value, equalTo("${guarded['jwt'].identity}")); assertThat(condition.publishes, not(nullValue())); assertThat(condition.publishes.get(0).topic, equalTo("command/one")); assertThat(condition.publishes.get(1).topic, equalTo("command/two")); + assertThat(condition.publishes.get(2).topic, equalTo("command/{id}")); + assertThat(condition.publishes.get(2).params.get(0).name, equalTo("id")); + assertThat(condition.publishes.get(2).params.get(0).value, equalTo("${guarded['jwt'].identity}")); } @Test @@ -100,18 +119,72 @@ public void shouldWriteCondition() .subscribe() .topic("reply/two") .build() + .subscribe() + .topic("reply/{id}") + .param(MqttTopicParamConfig.builder() + .name("id") + .value("${guarded['jwt'].identity}") + .build()) + .build() .publish() .topic("command/one") .build() .publish() .topic("command/two") .build() + .publish() + .topic("command/{id}") + .param(MqttTopicParamConfig.builder() + .name("id") + .value("${guarded['jwt'].identity}") + .build()) + .build() .build(); String text = jsonb.toJson(condition); assertThat(text, not(nullValue())); - assertThat(text, equalTo("{\"session\":[{\"client-id\":\"client-1\"}],\"subscribe\":[{\"topic\":\"reply/one\"}," + - "{\"topic\":\"reply/two\"}],\"publish\":[{\"topic\":\"command/one\"},{\"topic\":\"command/two\"}]}")); + assertThat(text, equalTo(""" + { + "session": + [ + { + "client-id":"client-1" + } + ], + "subscribe": + [ + { + "topic":"reply/one" + }, + { + "topic":"reply/two" + }, + { + "topic":"reply/{id}", + "params": + { + "id":"${guarded['jwt'].identity}" + } + } + ], + "publish": + [ + { + "topic":"command/one" + }, + { + "topic":"command/two" + }, + { + "topic":"command/{id}", + "params": + { + "id":"${guarded['jwt'].identity}" + } + } + ] + } + """.replaceAll("\\s*\\n\\s*", ""))); } } diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcherTest.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcherTest.java new file mode 100644 index 0000000000..2d49dd1e11 --- /dev/null +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcherTest.java @@ -0,0 +1,268 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.mqtt.internal.config; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Map; + +import org.junit.Test; + +import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig; +import io.aklivity.zilla.runtime.binding.mqtt.config.MqttPublishConfig; +import io.aklivity.zilla.runtime.binding.mqtt.config.MqttSubscribeConfig; +import io.aklivity.zilla.runtime.binding.mqtt.config.MqttTopicParamConfig; +import io.aklivity.zilla.runtime.engine.config.GuardedConfig; +import io.aklivity.zilla.runtime.engine.config.RouteConfig; + +public class MqttConditionMatcherTest +{ + + @Test + public void shouldMatchIsolatedMultiLevelWildcard() + { + MqttConditionMatcher matcher = buildMatcher( + "#", + "#"); + assertTrue(matcher.matchesPublish(1L, "#")); + assertTrue(matcher.matchesSubscribe(1L, "#")); + assertTrue(matcher.matchesPublish(1L, "topic")); + assertTrue(matcher.matchesSubscribe(1L, "topic")); + assertTrue(matcher.matchesPublish(1L, "topic/pub")); + assertTrue(matcher.matchesSubscribe(1L, "topic/sub")); + assertTrue(matcher.matchesPublish(1L, "topic/+/pub")); + assertTrue(matcher.matchesSubscribe(1L, "topic/+/sub")); + assertTrue(matcher.matchesPublish(1L, "topic/pub/#")); + assertTrue(matcher.matchesSubscribe(1L, "topic/sub/#")); + } + + @Test + public void shouldMatchMultipleTopicNames() + { + MqttConditionMatcher matcher = buildMatcher( + "topic/pub", + "topic/sub"); + assertTrue(matcher.matchesPublish(1L, "topic/pub")); + assertTrue(matcher.matchesSubscribe(1L, "topic/sub")); + assertFalse(matcher.matchesPublish(1L, "topic/#")); + assertFalse(matcher.matchesSubscribe(1L, "topic/#")); + assertFalse(matcher.matchesPublish(1L, "topic/+")); + assertFalse(matcher.matchesSubscribe(1L, "topic/+")); + assertFalse(matcher.matchesPublish(1L, "topic/sub")); + assertFalse(matcher.matchesSubscribe(1L, "topic/pub")); + assertFalse(matcher.matchesPublish(1L, "topic/pu")); + assertFalse(matcher.matchesSubscribe(1L, "topic/su")); + assertFalse(matcher.matchesPublish(1L, "topic/put")); + assertFalse(matcher.matchesSubscribe(1L, "topic/sup")); + assertFalse(matcher.matchesPublish(1L, "topic/publ")); + assertFalse(matcher.matchesSubscribe(1L, "topic/subs")); + assertFalse(matcher.matchesPublish(1L, "topicpub")); + assertFalse(matcher.matchesSubscribe(1L, "topicsub")); + assertFalse(matcher.matchesPublish(1L, "opic/pub")); + assertFalse(matcher.matchesSubscribe(1L, "opic/sub")); + assertFalse(matcher.matchesPublish(1L, "popic/pub")); + assertFalse(matcher.matchesSubscribe(1L, "zopic/sub")); + } + + @Test + public void shouldMatchMultipleTopicNamesWithSingleLevelWildcard() + { + MqttConditionMatcher matcher = buildMatcher( + "topic/pub/+", + "topic/sub/+"); + assertTrue(matcher.matchesPublish(1L, "topic/pub/aa")); + assertTrue(matcher.matchesSubscribe(1L, "topic/sub/bbb")); + assertTrue(matcher.matchesPublish(1L, "topic/pub/+")); + assertTrue(matcher.matchesSubscribe(1L, "topic/sub/+")); + assertFalse(matcher.matchesPublish(1L, "topic/sub/aa")); + assertFalse(matcher.matchesSubscribe(1L, "topic/pub/bbb")); + assertFalse(matcher.matchesPublish(1L, "topic/pub/aa/one")); + assertFalse(matcher.matchesSubscribe(1L, "topic/sub/bbb/two")); + } + + @Test + public void shouldMatchMultipleTopicNamesWithSingleAndMultiLevelWildcard() + { + MqttConditionMatcher matcher = buildMatcher( + "topic/+/pub/#", + "topic/+/sub/#"); + assertTrue(matcher.matchesPublish(1L, "topic/x/pub/aa")); + assertTrue(matcher.matchesSubscribe(1L, "topic/y/sub/b")); + assertTrue(matcher.matchesPublish(1L, "topic/x/pub/test/cc")); + assertTrue(matcher.matchesSubscribe(1L, "topic/y/sub/test/bb")); + assertFalse(matcher.matchesPublish(1L, "topic/pub/aa")); + assertFalse(matcher.matchesSubscribe(1L, "topic/sub/b")); + } + + @Test + public void shouldMatchTopicNameWithIdentityPlaceholder() + { + MqttConditionMatcher matcher = buildMatcher( + "pub/{id}", + Map.of("id", "${guarded['gname'].identity}"), + "sub/{id}", + Map.of("id", "${guarded['gname'].identity}"), + "gname", + Map.of( + 1L, "myuser", + 2L, "otheruser")); + assertTrue(matcher.matchesPublish(1L, "pub/myuser")); + assertTrue(matcher.matchesSubscribe(1L, "sub/myuser")); + assertTrue(matcher.matchesPublish(2L, "pub/otheruser")); + assertTrue(matcher.matchesSubscribe(2L, "sub/otheruser")); + assertFalse(matcher.matchesPublish(2L, "pub/myuser")); + assertFalse(matcher.matchesSubscribe(2L, "sub/myuser")); + assertFalse(matcher.matchesPublish(1L, "pub/otheruser")); + assertFalse(matcher.matchesSubscribe(1L, "sub/otheruser")); + assertFalse(matcher.matchesPublish(1L, "pub/myuset")); + assertFalse(matcher.matchesSubscribe(1L, "sub/myuset")); + assertFalse(matcher.matchesPublish(1L, "pub/myusert")); + assertFalse(matcher.matchesSubscribe(1L, "sub/myusert")); + assertFalse(matcher.matchesPublish(1L, "pub/myuser/a")); + assertFalse(matcher.matchesSubscribe(1L, "sub/myuser/a")); + assertFalse(matcher.matchesPublish(3L, "pub/myuser")); + assertFalse(matcher.matchesSubscribe(3L, "sub/myuser")); + assertFalse(matcher.matchesPublish(3L, "pub/null")); + assertFalse(matcher.matchesSubscribe(3L, "sub/null")); + } + + @Test + public void shouldMatchTopicNameWithIdentityPlaceholderAndMultiLevelWildcard() + { + MqttConditionMatcher matcher = buildMatcher( + "pub/{id}/#", + Map.of("id", "${guarded['gname'].identity}"), + "sub/{id}/#", + Map.of("id", "${guarded['gname'].identity}"), + "gname", + Map.of( + 1L, "myuser", + 2L, "otheruser")); + assertTrue(matcher.matchesPublish(1L, "pub/myuser/pubtest")); + assertTrue(matcher.matchesSubscribe(1L, "sub/myuser/subtest")); + assertTrue(matcher.matchesPublish(1L, "pub/myuser/pubtest/aaa")); + assertTrue(matcher.matchesSubscribe(1L, "sub/myuser/subtest/aa")); + assertTrue(matcher.matchesPublish(2L, "pub/otheruser/pubtest")); + assertTrue(matcher.matchesSubscribe(2L, "sub/otheruser/subtest")); + assertTrue(matcher.matchesPublish(2L, "pub/otheruser/pubtest/aa")); + assertTrue(matcher.matchesSubscribe(2L, "sub/otheruser/subtest/aa")); + assertFalse(matcher.matchesPublish(2L, "pub/myuser/pubtest")); + assertFalse(matcher.matchesSubscribe(2L, "sub/myuser/subtest")); + assertFalse(matcher.matchesPublish(2L, "pub/myuser/pubtest/aaa")); + assertFalse(matcher.matchesSubscribe(2L, "sub/myuser/subtest/aa")); + assertFalse(matcher.matchesPublish(1L, "pub/otheruser/pubtest")); + assertFalse(matcher.matchesSubscribe(1L, "sub/otheruser/subtest")); + assertFalse(matcher.matchesPublish(1L, "pub/otheruser/pubtest/aa")); + assertFalse(matcher.matchesSubscribe(1L, "sub/otheruser/subtest/aa")); + } + + @Test + public void shouldNotMatchTopicNameWithInvalidIdentityPlaceholder() + { + MqttConditionMatcher matcher = buildMatcher( + "pub/{id}", + Map.of("id", "${guarded['invalid'].identity}"), + "sub/{id}", + Map.of("id", "${guarded['invalid'].identity}"), + "gname", + Map.of( + 1L, "myuser", + 2L, "otheruser")); + assertFalse(matcher.matchesPublish(1L, "pub/{id}")); + assertFalse(matcher.matchesSubscribe(1L, "sub/{id}")); + assertFalse(matcher.matchesPublish(1L, "pub/myuser")); + assertFalse(matcher.matchesSubscribe(1L, "sub/myuser")); + assertFalse(matcher.matchesPublish(2L, "pub/otheruser")); + assertFalse(matcher.matchesSubscribe(2L, "sub/otheruser")); + } + + @Test + public void shouldMatchTopicNameWithUnconstrainedParam() + { + MqttConditionMatcher matcher = buildMatcher( + "pub/{id}", + "sub/{id}"); + assertTrue(matcher.matchesPublish(1L, "pub/aaa")); + assertTrue(matcher.matchesSubscribe(1L, "sub/aaa")); + assertTrue(matcher.matchesPublish(2L, "pub/bbb")); + assertTrue(matcher.matchesSubscribe(2L, "sub/bbb")); + } + + @Test + public void shouldNotMatchTopicNameWithNonExistentParamConstraint() + { + MqttConditionMatcher matcher = buildMatcher( + "pub/{id}", + Map.of("other", "${guarded['gname'].identity}"), + "sub/{id}", + Map.of("other", "${guarded['gname'].identity}"), + "gname", + Map.of( + 1L, "myuser", + 2L, "otheruser")); + assertFalse(matcher.matchesPublish(1L, "pub/myuser")); + assertFalse(matcher.matchesSubscribe(1L, "sub/myuser")); + assertFalse(matcher.matchesPublish(2L, "pub/otheruser")); + assertFalse(matcher.matchesSubscribe(2L, "sub/otheruser")); + } + + private static MqttConditionMatcher buildMatcher( + String publishTopic, + String subscribeTopic) + { + return buildMatcher(publishTopic, Map.of(), subscribeTopic, Map.of(), "", Map.of()); + } + + private static MqttConditionMatcher buildMatcher( + String publishTopic, + Map publishParams, + String subscribeTopic, + Map subscribeParams, + String guardName, + Map identities) + { + var publishConfigBuilder = MqttPublishConfig.builder() + .topic(publishTopic); + publishParams.forEach((k, v) -> publishConfigBuilder + .param(MqttTopicParamConfig.builder() + .name(k) + .value(v) + .build())); + + var subscribeConfigBuilder = MqttSubscribeConfig.builder() + .topic(subscribeTopic); + subscribeParams.forEach((k, v) -> subscribeConfigBuilder + .param(MqttTopicParamConfig.builder() + .name(k) + .value(v) + .build())); + + MqttConditionConfig condition = MqttConditionConfig.builder() + .publish(publishConfigBuilder.build()) + .subscribe(subscribeConfigBuilder.build()) + .build(); + + GuardedConfig guarded = GuardedConfig.builder() + .name(guardName) + .build(); + guarded.identity = identities::get; + var route = new MqttRouteConfig(RouteConfig.builder() + .guarded(guarded) + .build()); + return new MqttConditionMatcher(route, condition); + } +} diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java index 8b33e513f8..07a6d1d42e 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java @@ -513,4 +513,24 @@ public void shouldRejectLargeMessage() throws Exception { k3po.finish(); } + + @Test + @Configuration("server.guarded.identity.topic.param.yaml") + @Specification({ + "${net}/publish.topic.guarded.identity.param/client", + "${app}/publish.topic.guarded.identity.param/server"}) + public void shouldPublishToTopicWithGuardedIdentityParam() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("server.guarded.identity.topic.param.yaml") + @Specification({ + "${net}/publish.invalid.topic.guarded.identity.param/client", + "${app}/publish.invalid.topic.guarded.identity.param/server"}) + public void shouldRejectTopicWithInvalidGuardedIdentityParam() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/config/server.guarded.identity.topic.param.yaml b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/config/server.guarded.identity.topic.param.yaml new file mode 100644 index 0000000000..41ad54e493 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/config/server.guarded.identity.topic.param.yaml @@ -0,0 +1,48 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +guards: + test0: + type: test + options: + credentials: TOKEN + lifetime: PT5S + challenge: PT5S +bindings: + net0: + type: mqtt + kind: server + options: + authorization: + test0: + credentials: + connect: + username: Bearer {credentials} + routes: + - exit: app0 + guarded: + test0: [] + when: + - publish: + - topic: topic/{id}/pub + params: + id: ${guarded['test0'].identity} + - subscribe: + - topic: topic/{id}/sub + params: + id: ${guarded['test0'].identity} diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/schema/mqtt.schema.patch.json b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/schema/mqtt.schema.patch.json index 7303b0ad33..421d86bb97 100644 --- a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/schema/mqtt.schema.patch.json +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/schema/mqtt.schema.patch.json @@ -85,6 +85,20 @@ "title": "Topic", "type": "string", "pattern": "^(\\/?([\\w{}-]*|\\+)(\\/((?![-_])[\\w{}-]*|\\+))*(\\/#)?|#|\\/|\\$SYS(\\/((?![-_])[\\w{}-]*|\\+))*(\\/#)?)$" + }, + "params": + { + "title": "Params", + "type": "object", + "additionalProperties": false, + "patternProperties": + { + "^[a-zA-Z0-9]+$": + { + "type": "string", + "pattern": "^[a-zA-Z0-9{}\\[\\]\\.\\$']*$" + } + } } } } @@ -97,11 +111,27 @@ { "type": "object", "additionalProperties": false, - "properties": { - "topic": { + "properties": + { + "topic": + { "title": "Topic", "type": "string", "pattern": "^(\\/?([\\w{}-]*|\\+)(\\/((?![-_])[\\w{}-]*|\\+))*(\\/#)?|#|\\/|\\$SYS(\\/((?![-_])[\\w{}-]*|\\+))*(\\/#)?)$" + }, + "params": + { + "title": "Params", + "type": "object", + "additionalProperties": false, + "patternProperties": + { + "^[a-zA-Z0-9]+$": + { + "type": "string", + "pattern": "^[a-zA-Z0-9{}\\[\\]\\.\\$']*$" + } + } } } } diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.invalid.topic.guarded.identity.param/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.invalid.topic.guarded.identity.param/client.rpt new file mode 100644 index 0000000000..6a1f82cdb2 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.invalid.topic.guarded.identity.param/client.rpt @@ -0,0 +1,46 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:authorization 1L + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .clientId("client") + .build() + .build()} + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .subscribeQosMax(2) + .publishQosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +read zilla:data.empty + +write abort diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.invalid.topic.guarded.identity.param/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.invalid.topic.guarded.identity.param/server.rpt new file mode 100644 index 0000000000..b39a4b5342 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.invalid.topic.guarded.identity.param/server.rpt @@ -0,0 +1,49 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:authorization 1L + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .clientId("client") + .build() + .build()} + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .subscribeQosMax(2) + .publishQosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +write zilla:data.empty +write flush + +read aborted diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.topic.guarded.identity.param/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.topic.guarded.identity.param/client.rpt new file mode 100644 index 0000000000..66063e56b8 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.topic.guarded.identity.param/client.rpt @@ -0,0 +1,69 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:authorization 1L + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .clientId("client") + .build() + .build()} + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .subscribeQosMax(2) + .publishQosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +read zilla:data.empty +read notify RECEIVED_SESSION_STATE + + +connect await RECEIVED_SESSION_STATE + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("topic/test/pub") + .build() + .build()} + +connected + +write zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +write "message" diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.topic.guarded.identity.param/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.topic.guarded.identity.param/server.rpt new file mode 100644 index 0000000000..f99c56f60f --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.topic.guarded.identity.param/server.rpt @@ -0,0 +1,67 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:authorization 1L + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .clientId("client") + .build() + .build()} + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .subscribeQosMax(2) + .publishQosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +write zilla:data.empty +write flush + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("topic/test/pub") + .build() + .build()} + +connected + +read zilla:data.ext ${mqtt:matchDataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +read "message" diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.invalid.topic.guarded.identity.param/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.invalid.topic.guarded.identity.param/client.rpt new file mode 100644 index 0000000000..c646cbcb6e --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.invalid.topic.guarded.identity.param/client.rpt @@ -0,0 +1,48 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write [0x10 0x26] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x05] # protocol version + [0x82] # flags = username, clean start + [0x00 0x3c] # keep alive = 60s + [0x05] # properties + [0x27] 66560 # maximum packet size = 66560 + [0x00 0x06] "client" # client id + [0x00 0x0c] "Bearer TOKEN" # username + +read [0x20 0x03] # CONNACK + [0x00] # flags = none + [0x00] # reason code + [0x00] # properties + +write [0x30 0x18] # PUBLISH + [0x00 0x0e] "topic/none/pub" # topic name + [0x00] # properties + "message" # payload + +read [0xe0 0x02] # disconnect header + [0x83] # reason = implementation specific error + [0x00] # properties = none + +read closed diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.invalid.topic.guarded.identity.param/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.invalid.topic.guarded.identity.param/server.rpt new file mode 100644 index 0000000000..275a978e39 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.invalid.topic.guarded.identity.param/server.rpt @@ -0,0 +1,44 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted +connected + +read [0x10 0x26] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x05] # protocol version + [0x82] # flags = username, clean start + [0x00 0x3c] # keep alive = 60s + [0x05] # properties + [0x27] 66560 # maximum packet size = 66560 + [0x00 0x06] "client" # client id + [0x00 0x0c] "Bearer TOKEN" # username + +write [0x20 0x03] # CONNACK + [0x00] # flags = none + [0x00] # reason code + [0x00] # properties = none + +write [0xe0 0x02] # disconnect header + [0x83] # reason = implementation specific error + [0x00] # properties = none + +write close diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.topic.guarded.identity.param/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.topic.guarded.identity.param/client.rpt new file mode 100644 index 0000000000..7e552c6135 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.topic.guarded.identity.param/client.rpt @@ -0,0 +1,42 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write [0x10 0x26] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x05] # protocol version + [0x82] # flags = username, clean start + [0x00 0x3c] # keep alive = 60s + [0x05] # properties + [0x27] 66560 # maximum packet size = 66560 + [0x00 0x06] "client" # client id + [0x00 0x0c] "Bearer TOKEN" # username + +read [0x20 0x03] # CONNACK + [0x00] # flags = none + [0x00] # reason code + [0x00] # properties + +write [0x30 0x18] # PUBLISH + [0x00 0x0e] "topic/test/pub" # topic name + [0x00] # properties + "message" # payload diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.topic.guarded.identity.param/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.topic.guarded.identity.param/server.rpt new file mode 100644 index 0000000000..38f7ffea07 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.topic.guarded.identity.param/server.rpt @@ -0,0 +1,43 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted +connected + +read [0x10 0x26] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x05] # protocol version + [0x82] # flags = username, clean start + [0x00 0x3c] # keep alive = 60s + [0x05] # properties + [0x27] 66560 # maximum packet size = 66560 + [0x00 0x06] "client" # client id + [0x00 0x0c] "Bearer TOKEN" # username + +write [0x20 0x03] # CONNACK + [0x00] # flags = none + [0x00] # reason code + [0x00] # properties = none + +read [0x30 0x18] # PUBLISH + [0x00 0x0e] "topic/test/pub" # topic name + [0x00] # properties + "message" # payload diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/config/SchemaTest.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/config/SchemaTest.java index 54023b6817..5866771f40 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/config/SchemaTest.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/config/SchemaTest.java @@ -157,4 +157,12 @@ public void shouldRejectServerWhenSubscribeTopicInvalid() { schema.validate("server.when.subscribe.topic.invalid.yaml"); } + + @Test + public void shouldValidateServerWithGuardedIdentityTopicParam() + { + JsonObject config = schema.validate("server.guarded.identity.topic.param.yaml"); + + assertThat(config, not(nullValue())); + } } diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java index cb47367fe3..e6ea093ed0 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java @@ -436,4 +436,22 @@ public void shouldRejectLargeMessage() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${net}/publish.topic.guarded.identity.param/client", + "${net}/publish.topic.guarded.identity.param/server"}) + public void shouldPublishToTopicWithGuardedIdentityParam() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${net}/publish.invalid.topic.guarded.identity.param/client", + "${net}/publish.invalid.topic.guarded.identity.param/server"}) + public void shouldRejectTopicWithInvalidGuardedIdentityParam() throws Exception + { + k3po.finish(); + } }