-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathheath
148 lines (118 loc) · 5.62 KB
/
heath
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import asyncio
import json
import signal
import sys
import threading
from datetime import datetime, timedelta
import cv2
from azure.iot.device import Message, MethodResponse
from azure.iot.device.aio import IoTHubModuleClient
from azure.storage.blob import (BlobSasPermissions, BlobServiceClient,
ContentSettings, generate_blob_sas)
# note IoTHubModuleClient does not support the .get_storage_info_for_blob method, but if needed we could derive a SAS token from IoT Edge 1.2 identity service
# more info here https://kevinsaye.wordpress.com/2021/08/02/iot-edge-module-development-getting-the-certificate-and-sas-token-when-not-using-the-sdk/
# Event indicating client stop
stop_event = threading.Event()
client = None
# should get these from the twin, but this is a quick sample
URLToMonitor = "rtsp://ad***********168.15.205:554/"
blobStorageAccount = "kevinsaystorage1"
blobStorageKey = "24Q7K*****************t24z9dg=="
blobStorageContainer = "heath"
def create_client():
global client
client = IoTHubModuleClient.create_from_edge_environment()
async def method_request_handler(method_request):
print(str(datetime.now()) + " " + str(method_request.name) + " with payload " + str(method_request.payload))
# do something amazing with this API call from the cloud!
message = {"response" : await uploadImage()}
method_response = MethodResponse.create_from_method_request(method_request, 200, message)
await client.send_method_response(method_response)
try:
# for testing the recording process
client.on_method_request_received = method_request_handler
# deal with any twin changes and get the initial twin
client.on_twin_desired_properties_patch_received = twin_handler
except:
# Cleanup if failure occurs
client.shutdown()
raise
return client
async def twin_handler(patch):
global URLToMonitor
reported_properties = {}
reported_properties["URLToMonitor"] = URLToMonitor
if "URLToMonitor" in patch.keys():
URLToMonitor = patch["URLToMonitor"]
reported_properties["URLToMonitor"] = URLToMonitor
print(str(datetime.now()) + " URLToMonitor=" + str(URLToMonitor))
await client.patch_twin_reported_properties(reported_properties)
async def module_run(client):
# getting the initial twin
twin = await client.get_twin()
await twin_handler(twin['desired'])
while True:
await asyncio.sleep(1000)
async def sendMessage(message):
iotMessage = Message(json.dumps(message))
iotMessage.content_encoding = "utf-8"
iotMessage.content_type = "application/json"
await client.send_message(iotMessage)
async def uploadImageToLocalBlob():
return
async def uploadImage():
message = {}
try:
videosource = cv2.VideoCapture(URLToMonitor)
ok, image = videosource.read()
if ok:
# using cloud blob storage
jpegImage = cv2.imencode(".jpg", image)[1].tobytes()
blobServiceClient = BlobServiceClient(account_url="https://" + blobStorageAccount + ".blob.core.windows.net", credential=blobStorageKey)
blobContainerClient = blobServiceClient.get_container_client(blobStorageContainer)
fileName = str(datetime.now().timestamp()) + ".jpg"
blobClient = blobContainerClient.get_blob_client(fileName)
content_settings = ContentSettings(content_type="image/jpeg")
blobClient.upload_blob(jpegImage, blob_type="BlockBlob", overwrite=True, content_settings=content_settings)
cloudURL = None
if (blobServiceClient.get_container_client(container=blobStorageContainer).get_container_access_policy()['public_access'] == None):
cloudURL = blobClient.url + "?" + generate_blob_sas(account_name=blobServiceClient.account_name, container_name=blobStorageContainer,
account_key=blobServiceClient.credential.account_key, blob_name=fileName,
permission=BlobSasPermissions(read=True), expiry=datetime.utcnow() + timedelta(hours=1))
else:
cloudURL = blobClient.url
message = {"datetime": str(datetime.now()), "cloudURL": cloudURL, "camera": URLToMonitor}
await sendMessage(message)
print(str(datetime.now()) + " " + str(message))
videosource.release()
except:
pass
return message
def main():
if not sys.version >= "3.5.3":
raise Exception( "The sample requires python 3.5.3+. Current version of Python: %s" % sys.version )
print ( "IoT Hub Client for Python" )
# NOTE: Client is implicitly connected due to the handler being set on it
client = create_client()
# Define a handler to cleanup when module is is terminated by Edge
def module_termination_handler(signal, frame):
print ("IoTHubClient sample stopped by Edge")
stop_event.set()
# Set the Edge termination handler
signal.signal(signal.SIGTERM, module_termination_handler)
# Run the sample
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(module_run(client))
except Exception as e:
print("Unexpected error %s " % e)
raise
finally:
print("Shutting down IoT Hub Client...")
loop.run_until_complete(client.shutdown())
loop.close()
if __name__ == "__main__":
main()