Skip to content

Commit f305f80

Browse files
author
KC3ZVD
committed
feat: initial services definition
1 parent 1c52643 commit f305f80

File tree

6 files changed

+262
-0
lines changed

6 files changed

+262
-0
lines changed

src/kc3zvd/iot_state/listeners/__init__.py

Whitespace-only changes.
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
from zeroconf import ServiceStateChange, Zeroconf
2+
from typing import Optional, cast
3+
from iot_state.workers import wled
4+
import logging
5+
import asyncio
6+
import sys
7+
import os
8+
9+
from zeroconf.asyncio import (
10+
AsyncServiceBrowser,
11+
AsyncServiceInfo,
12+
AsyncZeroconf,
13+
)
14+
15+
_PENDING_TASKS: set[asyncio.Task] = set()
16+
_WLED_SUPPORT = "0.15.0"
17+
CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL")
18+
CELERY_BACKEND_URL = os.getenv("CELERY_BACKEND_URL")
19+
CELERY_RESULTS_URL = os.getenv("CELERY_RESULTS_URL")
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
def async_on_service_state_change(
25+
zeroconf: Zeroconf, service_type: str, name: str, state_change: ServiceStateChange
26+
) -> None:
27+
if not name.startswith('wled'):
28+
logger.debug(f"Service {name} does not appear to be a WLED service, skipping...")
29+
return
30+
if state_change is not ServiceStateChange.Added:
31+
logger.debug(f"State Change: {state_change}")
32+
return
33+
task = asyncio.ensure_future(async_publish_service_info(zeroconf, service_type, name))
34+
_PENDING_TASKS.add(task)
35+
task.add_done_callback(_PENDING_TASKS.discard)
36+
37+
38+
async def async_publish_service_info(zeroconf: Zeroconf, service_type: str, name: str) -> None:
39+
info = AsyncServiceInfo(service_type, name)
40+
await info.async_request(zeroconf, 3000)
41+
logger.debug("Info from zeroconf.get_service_info: %r" % (info))
42+
if info:
43+
addresses = ["%s:%d" % (addr, cast(int, info.port)) for addr in info.parsed_scoped_addresses()]
44+
45+
46+
details = {
47+
"source": 'mdns',
48+
"platform": 'wled',
49+
"attributes": {
50+
"addresses": addresses
51+
}
52+
}
53+
54+
wled.discover.delay(details=details)
55+
56+
logger.info(f"Adding service {name} to MQTT")
57+
else:
58+
logger.warning("No service info available for %s, skipping..." % (name))
59+
60+
class AsyncRunner:
61+
def __init__(self) -> None:
62+
self.aiobrowser: Optional[AsyncServiceBrowser] = None
63+
self.aiozc: Optional[AsyncZeroconf] = None
64+
65+
async def async_run(self) -> None:
66+
self.aiozc = AsyncZeroconf()
67+
68+
services = ["_http._tcp.local."]
69+
logger.debug("Watching for %s service(s)" % services)
70+
logger.info("Monitoring mDNS...")
71+
72+
self.aiobrowser = AsyncServiceBrowser(
73+
self.aiozc.zeroconf, services, handlers=[async_on_service_state_change]
74+
)
75+
while True:
76+
await asyncio.sleep(1)
77+
78+
async def async_close(self) -> None:
79+
assert self.aiozc is not None
80+
assert self.aiobrowser is not None
81+
await self.aiobrowser.async_cancel()
82+
await self.aiozc.async_close()
83+
84+
def listen() -> None:
85+
86+
# Set up logging
87+
logger.setLevel(level=logging.DEBUG)
88+
handler = logging.StreamHandler(sys.stdout)
89+
handler.setLevel(level=logging.DEBUG)
90+
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
91+
handler.setFormatter(formatter)
92+
logger.addHandler(handler)
93+
94+
# set up event loop
95+
loop = asyncio.get_event_loop()
96+
runner = AsyncRunner()
97+
98+
try:
99+
loop.run_until_complete(runner.async_run())
100+
except KeyboardInterrupt:
101+
loop.run_until_complete(runner.async_close())
102+
103+
if __name__ == "__main__":
104+
listen()

src/kc3zvd/iot_state/wled/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from dataclasses import dataclass
2+
import paho.mqtt.publish as publish
3+
import json
4+
import os
5+
6+
APP_MQTT_HOST = os.getenv("APP_MQTT_HOST")
7+
APP_MQTT_PORT = os.getenv("APP_MQTT_PORT", 1883)
8+
APP_MQTT_PROTO = os.getenv("APP_MQTT_PROTO", "mqtt")
9+
APP_MQTT_USER = os.getenv("APP_MQTT_USER")
10+
APP_MQTT_PASSWORD = os.getenv("APP_MQTT_PASSWORD")
11+
12+
@app.task
13+
def pub(details: str) -> None:
14+
platform = json.loads(details)['platform']
15+
topic = "%s" % (platform)
16+
payload = details
17+
18+
MQTTPublisher().publish(
19+
payload = payload,
20+
topic = topic
21+
)
22+
23+
24+
@dataclass
25+
class MQTTPublisher:
26+
"""
27+
Publishes messages to MQTT
28+
29+
Attributes:
30+
host: MQTT Host
31+
port: MQTT Port
32+
proto: MQTT Protocol. Currently supports `mqtt`
33+
user: MQTT User
34+
password: MQTT Password
35+
"""
36+
host: str = APP_MQTT_HOST
37+
port: int = APP_MQTT_PORT
38+
proto: str = APP_MQTT_PROTO
39+
user: str = APP_MQTT_USER
40+
password: str = APP_MQTT_PASSWORD
41+
42+
def __post_init__(self):
43+
print('Publisher init complete.')
44+
45+
def publish(self, payload, topic, qos=1):
46+
auth = None
47+
if self.user:
48+
auth = {
49+
"username": self.user,
50+
"password": self.password
51+
}
52+
publish.single(
53+
topic=topic,
54+
payload=payload,
55+
qos=qos,
56+
hostname=self.host,
57+
port=self.port,
58+
auth=auth
59+
)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from celery import Celery
2+
import os
3+
import logging
4+
5+
CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL")
6+
CELERY_BACKEND_URL = os.getenv("CELERY_BACKEND_URL")
7+
CELERY_RESULTS_URL = os.getenv("CELERY_RESULTS_URL")
8+
CELERY_LOG_LEVEL = os.getenv("CELERY_LOG_LEVEL", logging.INFO)
9+
10+
app = Celery('iot-state')
11+
12+
@app.task
13+
def state_changed() -> None:
14+
pass
15+
16+
def mqtt_notify( topic_prefix: str) -> None:
17+
# {topic_prefix}/{type}/{area}/{name}/{subtype}
18+
#
19+
# {type}: One of sensor, switch, notification, meter
20+
#
21+
# TODO: Ensure inputs are sanitized
22+
pass
23+
24+
def discord_notify() -> None:
25+
pass
26+
27+
def webhook_notify() -> None:
28+
pass

src/kc3zvd/iot_state/workers/wled.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import logging
2+
import requests
3+
from celery import Celery
4+
import os
5+
from iot_state.devices import Device
6+
from mongoengine import connect, MultipleObjectsReturned, DoesNotExist
7+
8+
9+
10+
CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL")
11+
CELERY_BACKEND_URL = os.getenv("CELERY_BACKEND_URL")
12+
CELERY_RESULTS_URL = os.getenv("CELERY_RESULTS_URL")
13+
CELERY_LOG_LEVEL = os.getenv("CELERY_LOG_LEVEL", logging.INFO)
14+
MONGODB_URL = os.getenv("MONGODB_URL")
15+
16+
app = Celery('iot-state')
17+
18+
# Set up logging
19+
logger = logging.getLogger(__name__)
20+
logger.setLevel(level=CELERY_LOG_LEVEL)
21+
22+
@app.task
23+
def discover(details: dict) -> None:
24+
25+
for address in details['attributes']['addresses']:
26+
url = "http://%s/json/info" % address
27+
try:
28+
logger.info("Attempting to retrieve info for wled device at %s" % (address))
29+
r = requests.get(url)
30+
if r.status_code != 200:
31+
error_message = "Recieved status code %s from %s, expected 200" % (r.status_code, url)
32+
logger.error(error_message)
33+
logger.error("Full response: \n %s" % r.content.decode())
34+
else:
35+
process_discovered_device.delay(details, r.json())
36+
except Exception as e:
37+
error_message = "Exception while retrieving info from %s." % (url)
38+
logger.error(error_message, exc_info=e)
39+
continue
40+
41+
@app.task
42+
def process_discovered_device(details: dict, state: dict) -> None:
43+
logger.debug("Details: %s" % (details))
44+
logger.debug("State: %s" % (state))
45+
connect(host=MONGODB_URL)
46+
device_id = state['mac']
47+
platform = details['platform']
48+
logger.info("Determining if device %s exists in state" % (device_id))
49+
50+
try:
51+
device = Device.objects.get(platform_id = device_id)
52+
logger.info("Device %s exists" % (device.platform_id))
53+
except MultipleObjectsReturned:
54+
logger.warning("Multiple matching devices found")
55+
except DoesNotExist:
56+
logger.info("Existing device not found, proceeding to bootstrap state")
57+
device = Device(
58+
platform_id=device_id,
59+
platform=details['platform'],
60+
discovery_source=details['source'])
61+
device = device.save()
62+
logger.info("Device saved with ID %s" % (device_id))
63+
register_state_watcher.delay(device_id)
64+
65+
66+
@app.task
67+
def register_state_watcher(device_id):
68+
connect(host=MONGODB_URL)
69+
device = Device.objects.get(platform_id = device_id)
70+
logger.info("Received request to monitor state changes for %s on platform %s" % (device.platform_id, device.platform))

0 commit comments

Comments
 (0)