Skip to content

Commit 691bb92

Browse files
author
KC3ZVD
committed
feat(publishers.mqtt): connecting and publishing test to mqtt successful
1 parent 13a7645 commit 691bb92

File tree

3 files changed

+61
-27
lines changed

3 files changed

+61
-27
lines changed

src/kc3zvd/iot_state/cli/__init__.py

+32-18
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,41 @@
77

88
@click.group(context_settings={"help_option_names": ["-h", "--help"], "auto_envvar_prefix": "IOT"}, invoke_without_command=False)
99
@click.version_option(version=__version__, prog_name="iot-state")
10-
@click.option('--redis-host', help="Redis instance host", default='localhost')
11-
@click.option('--redis-port', help="Redis instance port", default=6479, type=int)
10+
@click.option('--redis-username', help="Redis instance username", default='', type=str)
11+
@click.option('--redis-password', help="Redis instance password", default='', type=str)
12+
@click.option('--redis-host', help="Redis instance host", default='localhost', type=str)
13+
@click.option('--redis-port', help="Redis instance port", default=6379, type=int)
1214
@click.option('--redis-db', help="Redis instance DB number", default=0, type=int)
1315
@click.pass_context
14-
def iot_state(ctx, redis_host, redis_port, redis_db):
15-
ctx.ensure_object(dict)
16-
ctx.obj['redis_host'] = redis_host
17-
ctx.obj['redis_port'] = redis_port
18-
ctx.obj['redis_db'] = redis_db
19-
click.echo("Starting IOT state platform")
16+
def iot_state(ctx, redis_username, redis_password, redis_host, redis_port, redis_db):
17+
ctx.ensure_object(dict)
18+
19+
if redis_username or redis_password:
20+
if not redis_username or not redis_password:
21+
click.echo("Provide both username and password for redis")
22+
die()
23+
ctx.obj['redis_url'] = f"redis://{redis_username}:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
24+
else:
25+
ctx.obj['redis_url'] = f"redis://{redis_host}:{redis_port}/{redis_db}"
26+
click.echo("Starting IOT state platform")
27+
2028
@iot_state.command()
2129
@click.option('--platform', help="The platform to publish to", required=True,
2230
type=click.Choice(['mqtt'], case_sensitive=False))
31+
@click.option('--mqtt-host', help="The MQTT host to connect to", default='localhost', type=str)
32+
@click.option('--mqtt-port', help="The port to use to connect to the MQTT host", default=1883, type=int)
2333
@click.pass_context
24-
def publisher(ctx, platform):
25-
match platform:
26-
case 'mqtt':
27-
click.echo("mqtt platform selected")
28-
mqtt.run(
29-
redis_host=ctx.obj['redis_host'],
30-
redis_port=ctx.obj['redis_port'],
31-
redis_db=ctx.obj['redis_db'])
32-
case _:
33-
die()
34+
def publisher(ctx, platform, mqtt_host, mqtt_port):
35+
mqtt_url = ""
36+
match platform:
37+
case 'mqtt':
38+
click.echo("mqtt platform selected")
39+
mqtt.run(
40+
redis_url=ctx.obj['redis_url'],
41+
publisher={
42+
"mqtt_host": mqtt_host,
43+
"mqtt_port": mqtt_port
44+
}
45+
)
46+
case _:
47+
die()
+25-3
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,37 @@
11
from __future__ import annotations
22
from kc3zvd.iot_state.subscribers import redis
3+
import paho.mqtt.client as mqtt
34
import asyncio
5+
import json
6+
47
# prefix/device_type/area_name/device_name/state_class
8+
class MQTTPublisher:
9+
10+
def __init__(self, host, port):
11+
self.host = host
12+
self.port = port
513

6-
async def reader(channel: redis.client.PubSub):
14+
async def handle_messages(channel: redis.client.PubSub, publisher: str):
15+
publisher = json.loads(publisher)
16+
17+
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
18+
mqttc.connect(host=publisher['mqtt_host'], port=publisher['mqtt_port'])
19+
mqttc.loop_start()
720
while True:
821
message = await channel.get_message(ignore_subscribe_messages=True)
922
if message is not None:
1023
print(f"(Reader) Message Received: {message}")
24+
mqttc.publish(topic="test/message", payload=message['data']).wait_for_publish()
25+
mqttc.disconnect()
26+
mqttc.loop_stop()
27+
28+
async def subscribe(redis_url: str, publisher: str):
29+
update = asyncio.create_task(redis.subscribe('device:state:update', handle_messages, redis_url, publisher))
30+
create = asyncio.create_task(redis.subscribe('device:state:create', handle_messages, redis_url, publisher))
31+
await update
32+
await create
1133

12-
def run(redis_host: str = 'localhost', redis_port: int = 6379, redis_db: int = 0):
13-
asyncio.run(redis.subscribe('publish', reader, redis_host, redis_port, redis_db))
34+
def run(redis_url: str, publisher: dict):
35+
asyncio.run(subscribe(redis_url, json.dumps(publisher)))
1436

1537

+4-6
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
from __future__ import annotations
22
import redis.asyncio as redis
33
import asyncio
4-
STOPWORD = "STOP"
54

5+
async def subscribe(channel: str, callback: Callable[[], None], redis_url: str, publisher: str):
6+
client = redis.Redis.from_url(redis_url)
67

7-
async def subscribe(channel: str, callback: Callable[[], None], redis_host: int, redis_port: int, redis_db: int):
88
print(f"Subscribing to channel: {channel}")
9-
client = redis.Redis()
10-
119
async with client.pubsub() as pubsub:
12-
await pubsub.subscribe(channel)
13-
future = asyncio.create_task(callback(pubsub))
10+
await pubsub.subscribe(channel, publisher)
11+
future = asyncio.create_task(callback(pubsub, publisher))
1412
await future

0 commit comments

Comments
 (0)