Skip to content

Commit

Permalink
Apply requested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
epieffe committed Feb 9, 2025
1 parent 8688c66 commit d15893e
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ public class MqttPublishConfigBuilder<T> extends ConfigBuilder<T, MqttPublishCon

private String topic;

private final List<MqttTopicParamConfig> params;
private List<MqttTopicParamConfig> params;

MqttPublishConfigBuilder(
Function<MqttPublishConfig, T> mapper)
{
this.mapper = mapper;
this.params = new ArrayList<>();
}

@Override
Expand All @@ -53,6 +52,10 @@ public MqttPublishConfigBuilder<T> topic(
public MqttPublishConfigBuilder<T> param(
MqttTopicParamConfig param)
{
if (this.params == null)
{
this.params = new ArrayList<>();
}
this.params.add(param);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ public class MqttSubscribeConfigBuilder<T> extends ConfigBuilder<T, MqttSubscrib

private String topic;

private final List<MqttTopicParamConfig> params;
private List<MqttTopicParamConfig> params;

MqttSubscribeConfigBuilder(
Function<MqttSubscribeConfig, T> mapper)
{
this.mapper = mapper;
this.params = new ArrayList<>();
}

@Override
Expand All @@ -53,6 +52,10 @@ public MqttSubscribeConfigBuilder<T> topic(
public MqttSubscribeConfigBuilder<T> param(
MqttTopicParamConfig param)
{
if (this.params == null)
{
this.params = new ArrayList<>();
}
this.params.add(param);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public MqttRouteConfig resolveSubscribe(
String topic)
{
return routes.stream()
.filter(r -> r.authorized(authorization) && r.matchesSubscribe(topic, authorization))
.filter(r -> r.authorized(authorization) && r.matchesSubscribe(authorization, topic))
.findFirst()
.orElse(null);
}
Expand All @@ -128,7 +128,7 @@ public MqttRouteConfig resolvePublish(
String topic)
{
return routes.stream()
.filter(r -> r.authorized(authorization) && r.matchesPublish(topic, authorization))
.filter(r -> r.authorized(authorization) && r.matchesPublish(authorization, topic))
.findFirst()
.orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,14 @@ public JsonObject adaptToJson(
{
JsonArrayBuilder subscribes = Json.createArrayBuilder();

mqttCondition.subscribes.forEach(s ->
mqttCondition.subscribes.forEach(sub ->
{
JsonObjectBuilder subscribeJson = Json.createObjectBuilder();
subscribeJson.add(TOPIC_NAME, s.topic);
if (!s.params.isEmpty())
subscribeJson.add(TOPIC_NAME, sub.topic);
if (sub.params != null && !sub.params.isEmpty())
{
JsonObjectBuilder params = Json.createObjectBuilder();
s.params.forEach(par ->
{
params.add(par.name, par.value);
});
sub.params.forEach(p -> params.add(p.name, p.value));
subscribeJson.add(PARAMS_NAME, params);
}
subscribes.add(subscribeJson);
Expand All @@ -97,17 +94,14 @@ public JsonObject adaptToJson(
{
JsonArrayBuilder publishes = Json.createArrayBuilder();

mqttCondition.publishes.forEach(p ->
mqttCondition.publishes.forEach(pub ->
{
JsonObjectBuilder publishJson = Json.createObjectBuilder();
publishJson.add(TOPIC_NAME, p.topic);
if (!p.params.isEmpty())
publishJson.add(TOPIC_NAME, pub.topic);
if (pub.params != null && !pub.params.isEmpty())
{
JsonObjectBuilder params = Json.createObjectBuilder();
p.params.forEach(par ->
{
params.add(par.name, par.value);
});
pub.params.forEach(p -> params.add(p.name, p.value));
publishJson.add(PARAMS_NAME, params);
}
publishes.add(publishJson);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,27 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.agrona.collections.ObjLongPredicate;
import org.agrona.collections.LongObjPredicate;

import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig;
import io.aklivity.zilla.runtime.binding.mqtt.config.MqttTopicParamConfig;
import io.aklivity.zilla.runtime.engine.config.GuardedConfig;

public final class MqttConditionMatcher
{
private static final Pattern IDENTITY_PATTERN =
Pattern.compile("\\$\\{guarded(?:\\['([a-zA-Z]+[a-zA-Z0-9\\._\\:\\-]*)'\\]).identity\\}");

private final List<Matcher> sessionMatchers;
private final List<ObjLongPredicate<String>> subscribeMatchPredicates;
private final List<ObjLongPredicate<String>> publishMatchPredicates;
private final List<LongObjPredicate<String>> subscribeMatchPredicates;
private final List<LongObjPredicate<String>> publishMatchPredicates;

public MqttConditionMatcher(
MqttConditionConfig condition,
List<GuardedConfig> guarded)
MqttRouteConfig route,
MqttConditionConfig condition)
{
this.sessionMatchers =
condition.sessions != null && !condition.sessions.isEmpty() ?
Expand All @@ -49,11 +47,11 @@ public MqttConditionMatcher(
Matcher identityMatcher = IDENTITY_PATTERN.matcher("");
this.subscribeMatchPredicates =
condition.subscribes != null && !condition.subscribes.isEmpty() ?
condition.subscribes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, guarded, identityMatcher))
condition.subscribes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, route, identityMatcher))
.collect(Collectors.toList()) : null;
this.publishMatchPredicates =
condition.publishes != null && !condition.publishes.isEmpty() ?
condition.publishes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, guarded, identityMatcher))
condition.publishes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, route, identityMatcher))
.collect(Collectors.toList()) : null;
}

Expand All @@ -76,15 +74,15 @@ public boolean matchesSession(
}

public boolean matchesSubscribe(
String topic,
long authorization)
long authorization,
String topic)
{
boolean match = false;
if (subscribeMatchPredicates != null)
{
for (ObjLongPredicate<String> predicate : subscribeMatchPredicates)
for (LongObjPredicate<String> predicate : subscribeMatchPredicates)
{
match = predicate.test(topic, authorization);
match = predicate.test(authorization, topic);
if (match)
{
break;
Expand All @@ -95,15 +93,15 @@ public boolean matchesSubscribe(
}

public boolean matchesPublish(
String topic,
long authorization)
long authorization,
String topic)
{
boolean match = false;
if (publishMatchPredicates != null)
{
for (ObjLongPredicate<String> predicate : publishMatchPredicates)
for (LongObjPredicate<String> predicate : publishMatchPredicates)
{
match = predicate.test(topic, authorization);
match = predicate.test(authorization, topic);
if (match)
{
break;
Expand Down Expand Up @@ -132,10 +130,10 @@ private static List<Matcher> asWildcardMatcher(
return matchers;
}

private static ObjLongPredicate<String> asTopicMatchPredicate(
private static LongObjPredicate<String> asTopicMatchPredicate(
String wildcard,
List<MqttTopicParamConfig> params,
List<GuardedConfig> guarded,
MqttRouteConfig route,
Matcher identityMatcher
)
{
Expand All @@ -147,50 +145,77 @@ private static ObjLongPredicate<String> asTopicMatchPredicate(
.replaceAll("\\{([a-zA-Z_]+)\\}", "(?<$1>[^/]+)")).matcher("");

Collection<String> topicParams = topicMatcher.namedGroups().keySet();
params.stream()
.filter(p -> !topicParams.contains(p.name))
.forEach(p -> System.out.format("Undefined param constraint for MQTT topic %s: %s\n", wildcard, p.name));
topicParams.stream()
.filter(tp -> params.stream().noneMatch(p -> p.name.equals(tp)))
.filter(tp -> params == null || params.stream().noneMatch(p -> p.name.equals(tp)))
.forEach(tp -> System.out.format("Unconstrained param for MQTT topic %s: %s\n", wildcard, tp));

return params.isEmpty()
? (topic, auth) -> topicMatcher.reset(topic).matches()
: (topic, auth) -> matchesWithParams(topic, params, topicMatcher, identityMatcher, auth, guarded);
LongObjPredicate<String> topicMatchPredicate;
if (params == null || params.isEmpty())
{
topicMatchPredicate = (auth, topic) -> topicMatcher.reset(topic).matches();
}
else
{
List<GuardedIdentityParam> guardedIdentityParams = new ArrayList<>();
List<MqttTopicParamConfig> constantParams = new ArrayList<>();
for (MqttTopicParamConfig param : params)
{
if (!topicParams.contains(param.name))
{
System.out.format("Undefined param constraint for MQTT topic %s: %s\n", wildcard, param.name);
}
if (identityMatcher.reset(param.value).matches())
{
String guard = identityMatcher.group(1);
guardedIdentityParams.add(new GuardedIdentityParam(param.name, guard));
}
else
{
constantParams.add(param);
}
}
topicMatchPredicate = (auth, topic) ->
matchesWithParams(auth, topic, topicMatcher, route, guardedIdentityParams, constantParams);
}

return topicMatchPredicate;
}

private static boolean matchesWithParams(
long authorization,
String topic,
List<MqttTopicParamConfig> params,
Matcher topicMatcher,
Matcher identityMatcher,
long authorization,
List<GuardedConfig> guarded)
MqttRouteConfig route,
List<GuardedIdentityParam> guardedIdentityParams,
List<MqttTopicParamConfig> constantParams)
{
boolean match = topicMatcher.reset(topic).matches();
if (match)
{
for (MqttTopicParamConfig param : params)
for (GuardedIdentityParam param : guardedIdentityParams)
{
String value = param.value;
identityMatcher.reset(value);
if (identityMatcher.find())
String identity = route.identity(param.guard, authorization);
try
{
match = topicMatcher.group(param.name).equals(identity);
}
catch (IllegalArgumentException e)
{
String guardName = identityMatcher.group(1);
Optional<String> identity = guarded.stream()
.filter(g -> guardName.equals(g.name))
.findFirst()
.map(g -> g.identity.apply(authorization));
if (identity.isEmpty())
{
match = false;
break;
}
value = identityMatcher.replaceAll(identity.get());
match = false;
}
if (!match)
{
break;
}
}
}
if (match)
{
for (MqttTopicParamConfig param : constantParams)
{
try
{
match = topicMatcher.group(param.name).equals(value);
match = topicMatcher.group(param.name).equals(param.value);
}
catch (IllegalArgumentException e)
{
Expand All @@ -204,4 +229,10 @@ private static boolean matchesWithParams(
}
return match;
}

private record GuardedIdentityParam(
String name,
String guard)
{
}
}
Loading

0 comments on commit d15893e

Please sign in to comment.