From b9738b8aea8debd4f84e50adbea6f36addf50bfe Mon Sep 17 00:00:00 2001 From: "epieffe.eth" Date: Sat, 22 Feb 2025 17:17:31 +0100 Subject: [PATCH] Apply suggestions --- .../config/MqttConditionConfigAdapter.java | 23 +- .../internal/config/MqttConditionMatcher.java | 203 ++++++++---------- .../mqtt/internal/config/MqttRouteConfig.java | 15 +- 3 files changed, 112 insertions(+), 129 deletions(-) diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java index 7bba6a0411..911b47c4c6 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java @@ -20,7 +20,6 @@ import jakarta.json.JsonArrayBuilder; import jakarta.json.JsonObject; import jakarta.json.JsonObjectBuilder; -import jakarta.json.JsonString; import jakarta.json.bind.adapter.JsonbAdapter; import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig; @@ -145,10 +144,13 @@ public ConditionConfig adaptFromJson( if (subscribeJson.containsKey(PARAMS_NAME)) { - subscribeJson.getJsonObject(PARAMS_NAME).forEach((n, v) -> subscribe.param() - .name(n) - .value(JsonString.class.cast(v).getString()) - .build()); + JsonObject paramsJson = subscribeJson.getJsonObject(PARAMS_NAME); + + paramsJson.keySet().forEach(n -> + subscribe.param() + .name(n) + .value(paramsJson.getString(n)) + .build()); } subscribe.build(); @@ -168,10 +170,13 @@ public ConditionConfig adaptFromJson( if (publishJson.containsKey(PARAMS_NAME)) { - publishJson.getJsonObject(PARAMS_NAME).forEach((n, v) -> publish.param() - .name(n) - .value(JsonString.class.cast(v).getString()) - .build()); + JsonObject paramsJson = publishJson.getJsonObject(PARAMS_NAME); + + paramsJson.keySet().forEach(n -> + publish.param() + .name(n) + .value(paramsJson.getString(n)) + .build()); } publish.build(); diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java index a8954264a2..0cef789a38 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java @@ -18,6 +18,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.LongFunction; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -32,27 +35,25 @@ public final class MqttConditionMatcher private static final Pattern IDENTITY_PATTERN = Pattern.compile("\\$\\{guarded(?:\\['([a-zA-Z]+[a-zA-Z0-9\\._\\:\\-]*)'\\]).identity\\}"); + private final Matcher identityMatcher = IDENTITY_PATTERN.matcher(""); + private final List sessionMatchers; - private final List> subscribeMatchPredicates; - private final List> publishMatchPredicates; + private final List subscribeMatchers; + private final List publishMatchers; public MqttConditionMatcher( MqttRouteConfig route, MqttConditionConfig condition) { - this.sessionMatchers = - condition.sessions != null && !condition.sessions.isEmpty() ? - asWildcardMatcher(condition.sessions.stream().map(s -> s.clientId).collect(Collectors.toList())) : null; - - Matcher identityMatcher = IDENTITY_PATTERN.matcher(""); - this.subscribeMatchPredicates = - condition.subscribes != null && !condition.subscribes.isEmpty() ? - condition.subscribes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, route, identityMatcher)) - .collect(Collectors.toList()) : null; - this.publishMatchPredicates = - condition.publishes != null && !condition.publishes.isEmpty() ? - condition.publishes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, route, identityMatcher)) - .collect(Collectors.toList()) : null; + this.sessionMatchers = condition.sessions != null && !condition.sessions.isEmpty() + ? asWildcardMatcher(condition.sessions.stream().map(s -> s.clientId).collect(Collectors.toList())) + : null; + this.subscribeMatchers = condition.subscribes != null && !condition.subscribes.isEmpty() + ? condition.subscribes.stream().map(s -> new TopicMatcher(route::identity, s.topic, s.params)).toList() + : null; + this.publishMatchers = condition.publishes != null && !condition.publishes.isEmpty() + ? condition.publishes.stream().map(p -> new TopicMatcher(route::identity, p.topic, p.params)).toList() + : null; } public boolean matchesSession( @@ -78,11 +79,11 @@ public boolean matchesSubscribe( String topic) { boolean match = false; - if (subscribeMatchPredicates != null) + if (subscribeMatchers != null) { - for (LongObjPredicate predicate : subscribeMatchPredicates) + for (TopicMatcher matcher : subscribeMatchers) { - match = predicate.test(authorization, topic); + match = matcher.matches(authorization, topic); if (match) { break; @@ -97,11 +98,11 @@ public boolean matchesPublish( String topic) { boolean match = false; - if (publishMatchPredicates != null) + if (publishMatchers != null) { - for (LongObjPredicate predicate : publishMatchPredicates) + for (TopicMatcher matcher : publishMatchers) { - match = predicate.test(authorization, topic); + match = matcher.matches(authorization, topic); if (match) { break; @@ -130,109 +131,87 @@ private static List asWildcardMatcher( return matchers; } - private static LongObjPredicate asTopicMatchPredicate( - String wildcard, - List params, - MqttRouteConfig route, - Matcher identityMatcher - ) + private final class TopicMatcher { - final Matcher topicMatcher = Pattern.compile(wildcard - .replace(".", "\\.") - .replace("$", "\\$") - .replace("+", "[^/]*") - .replace("#", ".*") - .replaceAll("\\{([a-zA-Z_]+)\\}", "(?<$1>[^/]+)")).matcher(""); - - Collection topicParams = topicMatcher.namedGroups().keySet(); - topicParams.stream() - .filter(tp -> params == null || params.stream().noneMatch(p -> p.name.equals(tp))) - .forEach(tp -> System.out.format("Unconstrained param for MQTT topic %s: %s\n", wildcard, tp)); - - LongObjPredicate topicMatchPredicate; - if (params == null || params.isEmpty()) + private final Matcher matchTopic; + private final Map> matchParams; + + private TopicMatcher( + Function> identities, + String wildcard, + List params) { - topicMatchPredicate = (auth, topic) -> topicMatcher.reset(topic).matches(); + this.matchTopic = Pattern.compile(wildcard + .replace(".", "\\.") + .replace("$", "\\$") + .replace("+", "[^/]*") + .replace("#", ".*") + .replaceAll("\\{([a-zA-Z_]+)\\}", "(?<$1>[^/]+)")).matcher(""); + this.matchParams = params != null + ? params.stream() + .collect(Collectors.toMap( + p -> p.name, + p -> asTopicParamMatcher(identities, p.value))) + : null; + + Collection topicParams = matchTopic.namedGroups().keySet(); + topicParams.stream() + .filter(tp -> params == null || params.stream().noneMatch(p -> p.name.equals(tp))) + .forEach(tp -> System.out.format("Unconstrained param for MQTT topic %s: %s\n", wildcard, tp)); + if (params != null) + { + params.stream() + .filter(p -> !topicParams.contains(p.name)) + .forEach(p -> System.out.printf("Undefined param constraint for MQTT topic %s: %s\n", wildcard, p.name)); + } } - else + + private boolean matches( + long authorization, + String topic) { - List guardedIdentityParams = new ArrayList<>(); - List constantParams = new ArrayList<>(); - for (MqttTopicParamConfig param : params) - { - if (!topicParams.contains(param.name)) - { - System.out.format("Undefined param constraint for MQTT topic %s: %s\n", wildcard, param.name); - } - if (identityMatcher.reset(param.value).matches()) - { - String guard = identityMatcher.group(1); - guardedIdentityParams.add(new GuardedIdentityParam(param.name, guard)); - } - else + return matchTopic.reset(topic).matches() && + matchParams(name -> { - constantParams.add(param); - } - } - topicMatchPredicate = (auth, topic) -> - matchesWithParams(auth, topic, topicMatcher, route, guardedIdentityParams, constantParams); + try + { + return matchTopic.group(name); + } + catch (IllegalArgumentException e) + { + return null; + } + }, authorization); } - return topicMatchPredicate; - } + private boolean matchParams( + Function valuesByName, + long authorization) + { + return matchParams == null || + matchParams.entrySet().stream() + .allMatch(e -> e.getValue().test(authorization, valuesByName.apply(e.getKey()))); + } - private static boolean matchesWithParams( - long authorization, - String topic, - Matcher topicMatcher, - MqttRouteConfig route, - List guardedIdentityParams, - List constantParams) - { - boolean match = topicMatcher.reset(topic).matches(); - if (match) + private LongObjPredicate asTopicParamMatcher( + Function> identities, + String value) { - for (GuardedIdentityParam param : guardedIdentityParams) - { - String identity = route.identity(param.guard, authorization); - try - { - match = topicMatcher.group(param.name).equals(identity); - } - catch (IllegalArgumentException e) - { - match = false; - } - if (!match) - { - break; - } - } + return (identityMatcher.reset(value).matches()) + ? asTopicParamIdentityMatcher(identities.apply(identityMatcher.group(1))) + : asTopicParamValueMatcher(value); } - if (match) + + private static LongObjPredicate asTopicParamIdentityMatcher( + LongFunction identity) { - for (MqttTopicParamConfig param : constantParams) - { - try - { - match = topicMatcher.group(param.name).equals(param.value); - } - catch (IllegalArgumentException e) - { - match = false; - } - if (!match) - { - break; - } - } + return (a, v) -> v != null && identity != null && v.equals(identity.apply(a)); } - return match; - } - private record GuardedIdentityParam( - String name, - String guard) - { + private static LongObjPredicate asTopicParamValueMatcher( + String expected) + { + return (a, v) -> v != null && v.equals(expected); + } } } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java index 2924ffbe3e..d01220c041 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java @@ -41,14 +41,14 @@ public MqttRouteConfig( RouteConfig route) { this.id = route.id; - this.when = route.when.stream() - .map(MqttConditionConfig.class::cast) - .map(c -> new MqttConditionMatcher(this, c)) - .collect(toList()); this.with = (MqttWithConfig) route.with; this.authorized = route.authorized; this.identities = route.guarded.stream() .collect(toMap(g -> g.name, g -> g.identity)); + this.when = route.when.stream() + .map(MqttConditionConfig.class::cast) + .map(c -> new MqttConditionMatcher(this, c)) + .collect(toList()); } public long compositeId() @@ -62,11 +62,10 @@ boolean authorized( return authorized.test(authorization); } - String identity( - String guard, - long authorization) + LongFunction identity( + String guard) { - return identities.getOrDefault(guard, a -> null).apply(authorization); + return identities.get(guard); } boolean matchesSession(