-
I am trying to build a device client that is responsible for managing the IoT device. Idea is to have a process running all the time which is responsible for provisioning the device and communicate with its shadows for future updates and state management. It receives requests over zeromq topics to perform some operations. Here's my code. class ControlAgent(DeviceAgent):
def __init__(self):
super().__init__()
self.template_name = self.client_config.get("fleetProvisioningTemplateName")
self.sw_version = importlib.metadata.version("ilaas_device_client")
self.data_agent_endpoint = "ipc:///tmp/dataAgent"
self.jobs_agent_endpoint = "ipc:///tmp/jobsAgent"
self.data_socket = self.zmq.context.socket(_zmq.REQ)
self.data_socket.connect(self.data_agent_endpoint)
self.jobs_socket = self.zmq.context.socket(_zmq.REQ)
self.jobs_socket.connect(self.jobs_agent_endpoint)
self.create_keys_response = None
self.register_thing_response = None
self.thing_name = None
self.is_provisioned = False
self._initialize_device()
if self.is_provisioned:
self._get_mqtt_client()
asyncio.create_task(self._subscribe_to_shadows())
def _initialize_device(self):
"""Loads device configuration on startup and subscribes to named shadows if provisioned."""
try:
# TODO: Need a way to get the serial number
device_config = self.db_manager.load_device_config(serial_number="ZN012345")
if device_config:
logger.info(
"[CONTROL] Device is already provisioned. Loading device configuration..."
)
self.device_config = device_config
self.thing_name = self.device_config["thingName"]
self.is_provisioned = True
except Exception as e:
logger.warning(
f"[CONTROL] No existing device configuration found. Device is not provisioned. {e}"
)
async def _subscribe_to_shadows(self):
tasks = [
self.subscribe_to_named_shadow_updates(
self.thing_name, "reader-config-policy"
),
]
await asyncio.gather(*tasks)
async def handle_request(self, message):
"""Handles incoming REQ-REP messages."""
command = message.get("command")
logger.info(f"[CONTROL] Received command {command}.")
if command == "onboard":
return await self.onboard(message)
elif command == "boot":
return await self.boot()
elif command == "shutdown":
return await self.shutdown()
elif command == "config":
return await self.get_device_config(message)
# elif command == "configure_data_agent":
# return await self.configure_data_agent(message["config"])
# elif command == "configure_jobs_agent":
# return await self.configure_jobs_agent(message["config"])
else:
return {"error": "Unknown command"}
async def onboard(self, message):
"""
Starts AWS IoT Fleet Provisioning using the device's MAC address.
"""
serial_number = message.get("serialNumber")
config = await self._load_device_config(serial_number)
if config:
logger.info("[CONTROL] Device already provisioned.")
return {"error": "Device already provisioned."}
logger.info("[CONTROL] Starting IoT Fleet Provisioning...")
self.create_keys_response = None
self.register_thing_response = None
# TODO: validate request message.
# Create MQTT Connection
mqtt_client = self._get_mqtt_client(provisioning=True)
# Initialize AWS IoT Identity Client
identity_client = iotidentity.IotIdentityClient(mqtt_client)
# CreateKeysAndCertificate
await self._create_device_cert(identity_client)
await self._wait_for("create_keys_response", 10, "CreateKeysAndCertificate")
if not self.create_keys_response:
return {"error": "CreateKeysAndCertificate failed"}
# RegisterThing
await self._register_thing(identity_client, serial_number, message)
await self._wait_for("register_thing_response", 10, "RegisterThing")
if not self.register_thing_response:
return {"error": "RegisterThing failed"}
logger.info("[CONTROL] Storing device certificates.")
self.keystore.store_device_certificates(
cert_pem=self.create_keys_response.certificate_pem,
cert_key=self.create_keys_response.private_key,
)
logger.info("[CONTROL] Saving device configuration...")
thing_name = self.register_thing_response.thing_name
self.db_manager.save_device_config(
serial_number,
{...},
)
self.device_config = await self._load_device_config(serial_number)
self.is_provisioned = True
self.thing_name = thing_name
# Wait for configuration file to be saved
await asyncio.sleep(3)
self._restart_mqtt()
# Subscribe to named shadows
await self._subscribe_to_shadows()
await asyncio.sleep(3)
# Update named shadows
await self.publish_named_shadow_update(
thing_name, "reader-config-policy", reader_config
)
logger.info(f"[CONTROL] Device onboarding complete {self.mqtt_client}...")
# logger.info("[CONTROL] Sending updated device config to Data Agent...")
# await self.configure_data_agent(self.device_config)
return {"status": "Onboarding completed."}
async def subscribe_to_named_shadow_updates(self, thing_name, shadow_name):
"""Subscribes to the shadow update accepted, rejected, and delta topics using AWS IoT SDK."""
def on_accepted(response):
logger.info(f"[CONTROL] Shadow '{shadow_name}' update accepted...")
def on_rejected(response):
logger.exception(f"[CONTROL] Shadow '{shadow_name}' update rejected...")
def on_delta(response):
"""Handles shadow delta updates and applies changes to the device."""
logger.info(f"[CONTROL] Shadow '{shadow_name}' delta received...")
delta_state = response.state
try:
loop = asyncio.get_running_loop()
loop.create_task(self.apply_shadow_update(shadow_name, delta_state))
except RuntimeError:
logger.warning(
"[CONTROL] No running event loop found. Running apply_shadow_update manually."
)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.create_task(self.apply_shadow_update(shadow_name, delta_state))
try:
# mqtt_client = self._get_mqtt_client()
shadow_client = iotshadow.IotShadowClient(self.mqtt_client)
# Subscribe to update accepted
accepted, _ = shadow_client.subscribe_to_update_named_shadow_accepted(
request=iotshadow.UpdateNamedShadowSubscriptionRequest(
thing_name=thing_name, shadow_name=shadow_name
),
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=on_accepted,
)
# Subscribe to update rejected
rejected, _ = shadow_client.subscribe_to_update_named_shadow_rejected(
request=iotshadow.UpdateNamedShadowSubscriptionRequest(
thing_name=thing_name, shadow_name=shadow_name
),
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=on_rejected,
)
# Subscribe to update/delta to detect changes
delta, _ = shadow_client.subscribe_to_named_shadow_delta_updated_events(
request=iotshadow.NamedShadowDeltaUpdatedSubscriptionRequest(
thing_name=thing_name, shadow_name=shadow_name
),
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=on_delta,
)
# Wait for subscriptions to succeed
accepted.result()
rejected.result()
delta.result()
logger.info(
f"[CONTROL] Successfully subscribed to {thing_name}/shadow/name/{shadow_name} shadows..."
)
except Exception as e:
logger.error(
f"[CONTROL] Failed to subscribe to shadow updates for {shadow_name}: {e}"
)
async def publish_named_shadow_update(self, thing_name, shadow_name, state):
"""Publishes an update to an AWS IoT Named Shadow using AWS IoT SDK."""
logger.info(f"[CONTROL] Updating Shadow '{shadow_name}' for {thing_name}...")
try:
# mqtt_client = self._get_mqtt_client()
shadow_client = iotshadow.IotShadowClient(self.mqtt_client)
_ = shadow_client.publish_update_named_shadow(
request=iotshadow.UpdateNamedShadowRequest(
thing_name=thing_name,
shadow_name=shadow_name,
state=iotshadow.ShadowState(reported=state),
# version=1
),
qos=mqtt5.QoS.AT_LEAST_ONCE,
)
pub = _.result()
logger.info(f"[CONTROL] Shadow update published: {pub}")
except Exception as e:
logger.error(f"[CONTROL] Failed to update Shadow '{shadow_name}': {e}")
async def apply_shadow_update(self, shadow_name, delta_state):
"""Applies configuration updates received from shadow update/delta events."""
try:
logger.info(
f"[CONTROL] Applying update from shadow '{shadow_name}': {delta_state}"
)
_state = None
if shadow_name == "reader-config-policy":
logger.info("[CONTROL] Updating Reader Configuration...")
self.device_config["readerConfig"].update(delta_state)
_state = self.device_config["readerConfig"]
else:
logger.warning(
f"[CONTROL] Unknown shadow update received: {shadow_name}"
)
return
# Persist updated configuration
serial_number = self.device_config.get("serialNumber")
self.db_manager.save_device_config(serial_number, self.device_config)
logger.info(f"[CONTROL] Updated configuration saved for {shadow_name}")
# Acknowledge the update by reporting the new state back to AWS IoT
await self.publish_named_shadow_update(
self.device_config["thingName"], shadow_name, _state
)
except Exception as e:
logger.error(
f"[CONTROL] Failed to apply shadow update for {shadow_name}: {e}"
) And here's my DeviceAgent base class, where I implemented some utility methods class DeviceAgent:
def __init__(self):
self.zmq = ZeroMQManager()
self.db_manager = DBManager()
self.keystore = KeystoreManager()
self.client_dir = os.getenv("CLIENT_DIR")
self.client_config_path = os.path.join(self.client_dir, "config", "client.json")
self.client_config = self.load_client_config()
self.device_config = None
self.mqtt_client = None
self.identity_client = None
self.iot_endpoint = self.client_config.get("iotDataEndpoint")
self.debug_mode = self.client_config.get("debugMode", False)
self.is_provisioned = False
self._mqtt_connection_success = Future()
self._mqtt_connection_stop = Future()
def _get_mqtt_client(self, provisioning=False):
"""Returns an existing MQTT client or creates a new one if needed."""
if not self.mqtt_client:
self.mqtt_client = self._connect_to_mqtt(provisioning=provisioning)
return self.mqtt_client
def _connect_to_mqtt(self, provisioning=False):
"""
Establishes a persistent MQTT 5 connection to AWS IoT.
- Uses **claim certificates** during provisioning.
- Uses **device certificates** after provisioning.
- Handles automatic reconnections.
"""
# Select the appropriate certificates
if provisioning:
cert_path, key_path = self.keystore.get_claim_certificate()
client_id = uuid.uuid4().hex
else:
if not self.device_config:
logger.error("[DeviceAgent] No device configuration found for MQTT.")
return None
cert_path = str(self.keystore.device_cert_file)
key_path = str(self.keystore.device_key_file)
client_id = self.device_config["thingName"]
logger.info(f"Connecting with {cert_path} and {key_path}")
if not cert_path or not key_path:
logger.error("[DeviceAgent] Missing certificates. Cannot connect to MQTT.")
return None
logger.info(f"[DeviceAgent] Connecting to AWS IoT as '{client_id}'...")
# Create MQTT 5 Client
_mqtt_client = mqtt5_client_builder.mtls_from_path(
endpoint=self.iot_endpoint,
port=8883,
cert_filepath=cert_path,
pri_key_filepath=key_path,
client_id=client_id,
clean_session=False,
keep_alive_secs=300,
on_lifecycle_stopped=self._on_mqtt_stopped,
on_lifecycle_connection_success=self._on_mqtt_success,
on_lifecycle_connection_failure=self._on_mqtt_failure,
)
try:
_mqtt_client.start()
connect_success_data = self._mqtt_connection_success.result()
connack_packet = connect_success_data.connack_packet
# negotiated_settings = connect_success_data.negotiated_settings
logger.info(
f"[DeviceAgent] MQTT Connection Established {connack_packet.reason_code.name}..."
)
return _mqtt_client
except Exception as e:
logger.exception(f"[DeviceAgent] MQTT Connection Error: {e}")
return
def _on_mqtt_stopped(self, event):
"""Handles MQTT disconnections."""
logger.warning("[DeviceAgent] MQTT connection stopped...")
self._mqtt_connection_stop.set_result(event)
self.mqtt_client = None
self._mqtt_connection_success = Future()
def _on_mqtt_success(self, event):
"""Handles MQTT successful connections and prevents double Future completion."""
logger.info("[DeviceAgent] MQTT connection success.")
if not self._mqtt_connection_success.done():
self._mqtt_connection_success.set_result(event)
else:
logger.warning(
"[DeviceAgent] MQTT success event already set. Ignoring duplicate event."
)
def _on_mqtt_failure(self, event):
"""Handles MQTT reconnections."""
logger.info(f"[DeviceAgent] MQTT connection exception: {event.exception}.")
self.mqtt_client = None
self._mqtt_connection_stop = Future()
self._mqtt_connection_success = Future()
def _disconnect_mqtt(self):
"""Gracefully disconnects the existing MQTT connection."""
if self.mqtt_client:
try:
logger.info("[DeviceAgent] Disconnecting MQTT Client...")
self.mqtt_client.stop()
self._mqtt_connection_stop.result()
except Exception as e:
logger.error(f"[DeviceAgent] MQTT Disconnection Error: {e}")
def _restart_mqtt(self):
"""Restarts the MQTT connection."""
logger.info("[DeviceAgent] Restarting MQTT connection...")
self._disconnect_mqtt()
self._get_mqtt_client() The device provisioning is working fine in the If I restart my process, the agent initialize the device and then subscribe to the shadows topics again. If I make change to the desired state of shadow I still don't get the event. I can see in the AWS console that the thing has subscribed to the topics and is connected. Another strange thing is that it works if I use mqtt3 client with mqtt_connection_builder, instead of mqtt5_client_builder. Can someone please help? |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 2 replies
-
An isolated self-contained example reproducing the issue would help to find the root cause. If you can do this, that'd be great.
|
Beta Was this translation helpful? Give feedback.
-
Alright, I think I found the root cause. This code block from the
When you use MQTT3, the first Now, in your script, in I believe the MQTT5 logic is the correct one. It's unintentional behavior that shadow subscriptions continue working on MQTT3 after To fix your script, you need to keep a shadow client, i.e. use |
Beta Was this translation helpful? Give feedback.
-
Hello! Reopening this discussion to make it searchable. |
Beta Was this translation helpful? Give feedback.
Alright, I think I found the root cause. This code block from the
IotShadowClient
's parent class is the answer:When you use MQTT3, the first
if
branch just uses the connection you pass to it from your code (DeviceAgent::mqtt_client
object).When you switch to MQTT5, the second
if
branch executes. You can see that a new object is created there (self._mqtt_connection = mqtt_connection.n…