-
Notifications
You must be signed in to change notification settings - Fork 76
ThingsBoard Python Client SDK 2.0 – Complete Architectural Rewrite Based on gmqtt #99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…andidate for claiming processing for device client
…PC processing fails with details
Added device provisioning and firmware update
Fixed on_unsubscribe callback
…r json message dispatcher
Added provisioning and firmware update examples
…ateway and example
…est due to new entities usage
…end data without provided ts with correct timestamp
…ored rate limits entities, added initial implementation of gateway message splitter
amazing ! is help needed or wished ? make tb finally async would be amazing ... working with MSA containerd ( @K3S, @k0s, @microk8s ) with several asyncron processes where we use celery etc... the gap is still the sdk paho... and looked by my self long to make it asyn by my self ... ( email: [email protected] ) |
Hi @MannimMond86, Thanks for your interest in this PR! As you’ve noticed, we decided to base the asynchronous SDK on the gmqtt library, with some custom patches (tb_mqtt_client/common/gmqtt_patch.py). Right now, we already have unit, integration, and blackbox tests for the core functionality — but this is still an initial implementation. It may contain bugs or rough edges, and real-world use cases are exactly what will help us refine it. If you’d like to join the effort, we’d be glad to have you! You could start by building the application you have in mind using the new SDK and sharing any bugs, performance issues, or usability problems you encounter. Your experience with async microservice environments (Celery, container orchestration, etc.) would be especially valuable for shaping this SDK into truly production-ready. |
…s for case when platform is running locally
Description:
This PR introduces a complete rewrite of the Python Client SDK using an asynchronous, event‑driven architecture built on top of gmqtt.
The new implementation is optimized for high‑throughput telemetry and attribute publishing, strict ordering, advanced delivery tracking, and robust gateway/multi‑device operation.
Tested throughput: ~500,000 uplink datapoints/second under optimal conditions.
⸻
Architectural Changes
Core MQTT Layer
• New MQTTManager built on gmqtt with:
• Patched CONNACK, DISCONNECT, and PUBACK handlers for MQTT 5.0 reason codes/properties.
• QoS 1 persistent inflight tracking and retransmission.
• Publish/subscribe acknowledgment tracking via asyncio.Future.
• Integrated BackpressureController for automatic publish pause/resume.
Message Dispatching
• Unified MessageDispatcher for telemetry, attributes, and RPC publishing.
• JSON serialization with payload size and datapoint limit enforcement.
• Configurable batching and splitting via MessageSplitter.
Rate Limit Enforcement
• Supports ThingsBoard‑style multi‑window limits (10:1,300:60).
• Separate telemetry message and datapoint quotas.
• Head‑blocking queue logic ensures in‑order sending when rate limits are hit.
Message Queue
• Asynchronous deque with:
• Head‑first batch formation (no skipping blocked messages).
• Deduplication for re‑queued messages.
• Priority retry queue for QoS 1 retransmissions.
Structured Uplink Messages
• DeviceUplinkMessage immutable dataclass + builder:
• Predictable size tracking before publish.
• Shared delivery future for all split/batched payloads from a single source message.
• Reduced per‑datapoint overhead, efficient allocation.
Gateway Support
• GatewayClient manages connected devices via SubDeviceManager.
• Handles:
• Sub‑device attribute updates.
• RPC requests.
• Attribute responses.
• Same batching/splitting logic for main and sub‑devices.
Provisioning & Firmware
• ProvisioningClient supports token/basic/X.509 provisioning.
• FirmwareUpdater supports streaming download, checksum verification, and persistent storage.
⸻
Performance Notes
• Throughput: ~500k uplink datapoints/sec in optimal lab conditions.
• Latency: Low per‑message processing overhead.
• Ordering: Guaranteed strict ordering in the send queue.
• Delivery Tracking: PUBACK futures for precise delivery confirmation.
⸻
Testing
• Coverage includes:
• MQTT lifecycle (connect/reconnect/disconnect).
• PUBACK and QoS 1 retransmission handling.
• Rate limit exhaustion and recovery.
• Message batching/splitting edge cases.
• Gateway sub‑device lifecycle and routing.