diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java index 8b33e513f8..b54509c26a 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java @@ -66,6 +66,16 @@ public void shouldPublishOneMessage() throws Exception k3po.finish(); } + @Test + @Configuration("server.guarded.identity.topic.param.yaml") + @Specification({ + "${net}/publish.topic.guarded.identity.param/client", + "${app}/publish.topic.guarded.identity.param/server"}) + public void shouldPublishToTopicWithGuardedIdentityParam() throws Exception + { + k3po.finish(); + } + @Test @Configuration("server.yaml") @Specification({ diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/config/server.guarded.identity.topic.param.yaml b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/config/server.guarded.identity.topic.param.yaml new file mode 100644 index 0000000000..41ad54e493 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/config/server.guarded.identity.topic.param.yaml @@ -0,0 +1,48 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +guards: + test0: + type: test + options: + credentials: TOKEN + lifetime: PT5S + challenge: PT5S +bindings: + net0: + type: mqtt + kind: server + options: + authorization: + test0: + credentials: + connect: + username: Bearer {credentials} + routes: + - exit: app0 + guarded: + test0: [] + when: + - publish: + - topic: topic/{id}/pub + params: + id: ${guarded['test0'].identity} + - subscribe: + - topic: topic/{id}/sub + params: + id: ${guarded['test0'].identity} diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.topic.guarded.identity.param/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.topic.guarded.identity.param/client.rpt new file mode 100644 index 0000000000..66063e56b8 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.topic.guarded.identity.param/client.rpt @@ -0,0 +1,69 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:authorization 1L + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .clientId("client") + .build() + .build()} + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .subscribeQosMax(2) + .publishQosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +read zilla:data.empty +read notify RECEIVED_SESSION_STATE + + +connect await RECEIVED_SESSION_STATE + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("topic/test/pub") + .build() + .build()} + +connected + +write zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +write "message" diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.topic.guarded.identity.param/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.topic.guarded.identity.param/server.rpt new file mode 100644 index 0000000000..f99c56f60f --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.topic.guarded.identity.param/server.rpt @@ -0,0 +1,67 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:authorization 1L + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .clientId("client") + .build() + .build()} + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .subscribeQosMax(2) + .publishQosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +write zilla:data.empty +write flush + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("topic/test/pub") + .build() + .build()} + +connected + +read zilla:data.ext ${mqtt:matchDataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +read "message" diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.topic.guarded.identity.param/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.topic.guarded.identity.param/client.rpt new file mode 100644 index 0000000000..7e552c6135 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.topic.guarded.identity.param/client.rpt @@ -0,0 +1,42 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write [0x10 0x26] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x05] # protocol version + [0x82] # flags = username, clean start + [0x00 0x3c] # keep alive = 60s + [0x05] # properties + [0x27] 66560 # maximum packet size = 66560 + [0x00 0x06] "client" # client id + [0x00 0x0c] "Bearer TOKEN" # username + +read [0x20 0x03] # CONNACK + [0x00] # flags = none + [0x00] # reason code + [0x00] # properties + +write [0x30 0x18] # PUBLISH + [0x00 0x0e] "topic/test/pub" # topic name + [0x00] # properties + "message" # payload diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.topic.guarded.identity.param/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.topic.guarded.identity.param/server.rpt new file mode 100644 index 0000000000..38f7ffea07 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.topic.guarded.identity.param/server.rpt @@ -0,0 +1,43 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted +connected + +read [0x10 0x26] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x05] # protocol version + [0x82] # flags = username, clean start + [0x00 0x3c] # keep alive = 60s + [0x05] # properties + [0x27] 66560 # maximum packet size = 66560 + [0x00 0x06] "client" # client id + [0x00 0x0c] "Bearer TOKEN" # username + +write [0x20 0x03] # CONNACK + [0x00] # flags = none + [0x00] # reason code + [0x00] # properties = none + +read [0x30 0x18] # PUBLISH + [0x00 0x0e] "topic/test/pub" # topic name + [0x00] # properties + "message" # payload diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/config/SchemaTest.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/config/SchemaTest.java index 54023b6817..02a5e0f8d7 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/config/SchemaTest.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/config/SchemaTest.java @@ -130,6 +130,14 @@ public void shouldValidateServerWithTopicValidators() assertThat(config, not(nullValue())); } + @Test + public void shouldValidateServerWithGuardedIdentityTopicParam() + { + JsonObject config = schema.validate("server.guarded.identity.topic.param.yaml"); + + assertThat(config, not(nullValue())); + } + @Test public void shouldValidateServerWithUserPropertiesValidators() { diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java index cb47367fe3..ba33b77845 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java @@ -46,6 +46,15 @@ public void shouldSendOneMessage() throws Exception k3po.finish(); } + @Test + @Specification({ + "${net}/publish.topic.guarded.identity.param/client", + "${net}/publish.topic.guarded.identity.param/server"}) + public void shouldPublishToTopicWithGuardedIdentityParam() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${net}/publish.one.message.disconnect/client",