Skip to content

Commit

Permalink
Apply suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
epieffe committed Feb 22, 2025
1 parent a189179 commit b9738b8
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import jakarta.json.JsonArrayBuilder;
import jakarta.json.JsonObject;
import jakarta.json.JsonObjectBuilder;
import jakarta.json.JsonString;
import jakarta.json.bind.adapter.JsonbAdapter;

import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig;
Expand Down Expand Up @@ -145,10 +144,13 @@ public ConditionConfig adaptFromJson(

if (subscribeJson.containsKey(PARAMS_NAME))
{
subscribeJson.getJsonObject(PARAMS_NAME).forEach((n, v) -> subscribe.param()
.name(n)
.value(JsonString.class.cast(v).getString())
.build());
JsonObject paramsJson = subscribeJson.getJsonObject(PARAMS_NAME);

paramsJson.keySet().forEach(n ->
subscribe.param()
.name(n)
.value(paramsJson.getString(n))
.build());
}

subscribe.build();
Expand All @@ -168,10 +170,13 @@ public ConditionConfig adaptFromJson(

if (publishJson.containsKey(PARAMS_NAME))
{
publishJson.getJsonObject(PARAMS_NAME).forEach((n, v) -> publish.param()
.name(n)
.value(JsonString.class.cast(v).getString())
.build());
JsonObject paramsJson = publishJson.getJsonObject(PARAMS_NAME);

paramsJson.keySet().forEach(n ->
publish.param()
.name(n)
.value(paramsJson.getString(n))
.build());
}

publish.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -32,27 +35,25 @@ public final class MqttConditionMatcher
private static final Pattern IDENTITY_PATTERN =
Pattern.compile("\\$\\{guarded(?:\\['([a-zA-Z]+[a-zA-Z0-9\\._\\:\\-]*)'\\]).identity\\}");

private final Matcher identityMatcher = IDENTITY_PATTERN.matcher("");

private final List<Matcher> sessionMatchers;
private final List<LongObjPredicate<String>> subscribeMatchPredicates;
private final List<LongObjPredicate<String>> publishMatchPredicates;
private final List<TopicMatcher> subscribeMatchers;
private final List<TopicMatcher> publishMatchers;

public MqttConditionMatcher(
MqttRouteConfig route,
MqttConditionConfig condition)
{
this.sessionMatchers =
condition.sessions != null && !condition.sessions.isEmpty() ?
asWildcardMatcher(condition.sessions.stream().map(s -> s.clientId).collect(Collectors.toList())) : null;

Matcher identityMatcher = IDENTITY_PATTERN.matcher("");
this.subscribeMatchPredicates =
condition.subscribes != null && !condition.subscribes.isEmpty() ?
condition.subscribes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, route, identityMatcher))
.collect(Collectors.toList()) : null;
this.publishMatchPredicates =
condition.publishes != null && !condition.publishes.isEmpty() ?
condition.publishes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, route, identityMatcher))
.collect(Collectors.toList()) : null;
this.sessionMatchers = condition.sessions != null && !condition.sessions.isEmpty()
? asWildcardMatcher(condition.sessions.stream().map(s -> s.clientId).collect(Collectors.toList()))
: null;
this.subscribeMatchers = condition.subscribes != null && !condition.subscribes.isEmpty()
? condition.subscribes.stream().map(s -> new TopicMatcher(route::identity, s.topic, s.params)).toList()
: null;
this.publishMatchers = condition.publishes != null && !condition.publishes.isEmpty()
? condition.publishes.stream().map(p -> new TopicMatcher(route::identity, p.topic, p.params)).toList()
: null;
}

public boolean matchesSession(
Expand All @@ -78,11 +79,11 @@ public boolean matchesSubscribe(
String topic)
{
boolean match = false;
if (subscribeMatchPredicates != null)
if (subscribeMatchers != null)
{
for (LongObjPredicate<String> predicate : subscribeMatchPredicates)
for (TopicMatcher matcher : subscribeMatchers)
{
match = predicate.test(authorization, topic);
match = matcher.matches(authorization, topic);
if (match)
{
break;
Expand All @@ -97,11 +98,11 @@ public boolean matchesPublish(
String topic)
{
boolean match = false;
if (publishMatchPredicates != null)
if (publishMatchers != null)
{
for (LongObjPredicate<String> predicate : publishMatchPredicates)
for (TopicMatcher matcher : publishMatchers)
{
match = predicate.test(authorization, topic);
match = matcher.matches(authorization, topic);
if (match)
{
break;
Expand Down Expand Up @@ -130,109 +131,87 @@ private static List<Matcher> asWildcardMatcher(
return matchers;
}

private static LongObjPredicate<String> asTopicMatchPredicate(
String wildcard,
List<MqttTopicParamConfig> params,
MqttRouteConfig route,
Matcher identityMatcher
)
private final class TopicMatcher
{
final Matcher topicMatcher = Pattern.compile(wildcard
.replace(".", "\\.")
.replace("$", "\\$")
.replace("+", "[^/]*")
.replace("#", ".*")
.replaceAll("\\{([a-zA-Z_]+)\\}", "(?<$1>[^/]+)")).matcher("");

Collection<String> topicParams = topicMatcher.namedGroups().keySet();
topicParams.stream()
.filter(tp -> params == null || params.stream().noneMatch(p -> p.name.equals(tp)))
.forEach(tp -> System.out.format("Unconstrained param for MQTT topic %s: %s\n", wildcard, tp));

LongObjPredicate<String> topicMatchPredicate;
if (params == null || params.isEmpty())
private final Matcher matchTopic;
private final Map<String, LongObjPredicate<String>> matchParams;

private TopicMatcher(
Function<String, LongFunction<String>> identities,
String wildcard,
List<MqttTopicParamConfig> params)
{
topicMatchPredicate = (auth, topic) -> topicMatcher.reset(topic).matches();
this.matchTopic = Pattern.compile(wildcard
.replace(".", "\\.")
.replace("$", "\\$")
.replace("+", "[^/]*")
.replace("#", ".*")
.replaceAll("\\{([a-zA-Z_]+)\\}", "(?<$1>[^/]+)")).matcher("");
this.matchParams = params != null
? params.stream()
.collect(Collectors.toMap(
p -> p.name,
p -> asTopicParamMatcher(identities, p.value)))
: null;

Collection<String> topicParams = matchTopic.namedGroups().keySet();
topicParams.stream()
.filter(tp -> params == null || params.stream().noneMatch(p -> p.name.equals(tp)))
.forEach(tp -> System.out.format("Unconstrained param for MQTT topic %s: %s\n", wildcard, tp));
if (params != null)
{
params.stream()
.filter(p -> !topicParams.contains(p.name))
.forEach(p -> System.out.printf("Undefined param constraint for MQTT topic %s: %s\n", wildcard, p.name));
}
}
else

private boolean matches(
long authorization,
String topic)
{
List<GuardedIdentityParam> guardedIdentityParams = new ArrayList<>();
List<MqttTopicParamConfig> constantParams = new ArrayList<>();
for (MqttTopicParamConfig param : params)
{
if (!topicParams.contains(param.name))
{
System.out.format("Undefined param constraint for MQTT topic %s: %s\n", wildcard, param.name);
}
if (identityMatcher.reset(param.value).matches())
{
String guard = identityMatcher.group(1);
guardedIdentityParams.add(new GuardedIdentityParam(param.name, guard));
}
else
return matchTopic.reset(topic).matches() &&
matchParams(name ->
{
constantParams.add(param);
}
}
topicMatchPredicate = (auth, topic) ->
matchesWithParams(auth, topic, topicMatcher, route, guardedIdentityParams, constantParams);
try
{
return matchTopic.group(name);
}
catch (IllegalArgumentException e)
{
return null;
}
}, authorization);
}

return topicMatchPredicate;
}
private boolean matchParams(
Function<String, String> valuesByName,
long authorization)
{
return matchParams == null ||
matchParams.entrySet().stream()
.allMatch(e -> e.getValue().test(authorization, valuesByName.apply(e.getKey())));
}

private static boolean matchesWithParams(
long authorization,
String topic,
Matcher topicMatcher,
MqttRouteConfig route,
List<GuardedIdentityParam> guardedIdentityParams,
List<MqttTopicParamConfig> constantParams)
{
boolean match = topicMatcher.reset(topic).matches();
if (match)
private LongObjPredicate<String> asTopicParamMatcher(
Function<String, LongFunction<String>> identities,
String value)
{
for (GuardedIdentityParam param : guardedIdentityParams)
{
String identity = route.identity(param.guard, authorization);
try
{
match = topicMatcher.group(param.name).equals(identity);
}
catch (IllegalArgumentException e)
{
match = false;
}
if (!match)
{
break;
}
}
return (identityMatcher.reset(value).matches())
? asTopicParamIdentityMatcher(identities.apply(identityMatcher.group(1)))
: asTopicParamValueMatcher(value);
}
if (match)

private static LongObjPredicate<String> asTopicParamIdentityMatcher(
LongFunction<String> identity)
{
for (MqttTopicParamConfig param : constantParams)
{
try
{
match = topicMatcher.group(param.name).equals(param.value);
}
catch (IllegalArgumentException e)
{
match = false;
}
if (!match)
{
break;
}
}
return (a, v) -> v != null && identity != null && v.equals(identity.apply(a));
}
return match;
}

private record GuardedIdentityParam(
String name,
String guard)
{
private static LongObjPredicate<String> asTopicParamValueMatcher(
String expected)
{
return (a, v) -> v != null && v.equals(expected);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ public MqttRouteConfig(
RouteConfig route)
{
this.id = route.id;
this.when = route.when.stream()
.map(MqttConditionConfig.class::cast)
.map(c -> new MqttConditionMatcher(this, c))
.collect(toList());
this.with = (MqttWithConfig) route.with;
this.authorized = route.authorized;
this.identities = route.guarded.stream()
.collect(toMap(g -> g.name, g -> g.identity));
this.when = route.when.stream()
.map(MqttConditionConfig.class::cast)
.map(c -> new MqttConditionMatcher(this, c))
.collect(toList());
}

public long compositeId()
Expand All @@ -62,11 +62,10 @@ boolean authorized(
return authorized.test(authorization);
}

String identity(
String guard,
long authorization)
LongFunction<String> identity(
String guard)
{
return identities.getOrDefault(guard, a -> null).apply(authorization);
return identities.get(guard);
}

boolean matchesSession(
Expand Down

0 comments on commit b9738b8

Please sign in to comment.