feat(integration): deliver client lifecycle events to integrations#323
Open
deaflynx wants to merge 104 commits into
Open
feat(integration): deliver client lifecycle events to integrations#323deaflynx wants to merge 104 commits into
deaflynx wants to merge 104 commits into
Conversation
…bIeMsgProto wrapper
…delete; allow events-only integration
…ation handlers; add missing license header
… edit form - Move lifecycleEventTypes from the page root form into each integration config form group (kafka/mqtt/http), as a sibling of topicFilters; render it as a security-setting card directly below the Topic filters card - Drop the topicFilters-style control when isNew so the wizard's dedicated lifecycle step stays authoritative - markForCheck() in the lifecycle component's writeValue/setDisabledState so OnPush reflects persisted event types when editing an existing integration - Show the "no selection has no effect" hint only in the wizard via a new showNoSelectionHint input - Convert maxStep getter to a readonly field (eslint class-literal-property-style)
Replace the checkbox list with a Material multi-select dropdown, following the project's reactive-CVA idiom (internal UntypedFormControl + valueChanges) so OnPush change detection is no longer needed. Add the "Event types" label key for the select.
Publish a lifecycle event when a client unsubscribes, mirroring the CLIENT_SUBSCRIBED flow: - new ClientLifecycleEventType.CLIENT_UNSUBSCRIBED enum value - ClientLifecycleEventMsgProto gains a repeated topicFilters field (unsubscribe carries plain topic filters, no QoS) - IntegrationLifecycleEventPublisher.publishUnsubscribed, fired from MqttUnsubscribeHandler after the UNSUBACK is flushed - AbstractIntegration emits a "topicFilters" array for the unsubscribe body - UI: add the event type to the model enum, lifecycle multi-select and i18n
…igger Use a custom mat-select-trigger with a mat-chip-set so selected event types wrap to multiple rows and the form field grows in height instead of truncating the comma-joined value with an ellipsis.
# Conflicts: # application/src/main/java/org/thingsboard/mqtt/broker/actors/client/service/disconnect/DisconnectServiceImpl.java
The lifecycle-event integration feature added new constructor dependencies and changed updateSubscriptions logic without updating the affected tests, breaking 68 unit tests. - Add @MockitoBean for IntegrationLifecycleEventPublisher to DisconnectServiceImplTest, MqttSubscribeHandlerTest and ConnectServiceImplTest, and for IntegrationLifecycleEventTypeCache to BrokerInitializerTest, so the Spring test contexts load again. - Give the DefaultPlatformIntegrationServiceTest integration a non-empty topicFilters config, since updateSubscriptions now skips empty arrays.
…hIntegrationMsgProto Remove TbIeMsgProto wrapper and UUID random key from the data path: - IntegrationMsgQueuePublisher interface reverts to PublishIntegrationMsgProto - IntegrationMsgQueuePublisherImpl reverts field/builder/param types and restores the topic-aware failure log - MsgPersistenceManagerImpl.sendIntegrationMsg reverts to direct PublishIntegrationMsgProto with topic-name message key; removes stale TbIeMsgProto and UUID imports
…ed events stream Drop TbIeMsgProto import and IntegrationMsgQueuePublisher field from IntegrationLifecycleEventPublisherImpl; replace the private publish() sink with a logged no-op (Phase 3 rewire deferred). Also migrate IntegrationPackProcessingContext and IntegrationPackProcessingResult in integration-api from the now-removed TbIeMsgProto to PublishIntegrationMsgProto so that `application -am compile` succeeds.
…lt by message type Replace the concrete PublishIntegrationMsgProto type with a type parameter <T> in IntegrationPackProcessingContext and IntegrationPackProcessingResult so one set of machinery can serve both the DATA stream and a future EVENTS stream. Widen IntegrationProcessorStats.log() and its impl to IntegrationPackProcessingResult<?> to accept any parameterization without raw-type warnings.
…efusals CLIENT_CONNECTION_FAILED was emitted only from refuseConnection (the broker-side quota/server-unavailable path). The shouldProceedWithConnection early-return refuses the connection inline for MQTT-protocol validation failures (invalid client id, MQTT5 receive-max protocol error, invalid or unauthorized last-will topic) without going through refuseConnection, so those refusals emitted no lifecycle event at all. Emit CLIENT_CONNECTION_FAILED there too, so every broker CONNECT refusal produces the event: - unify publishConnectionFailed to take the SessionInfo and a reason String, and build the event from the passed session — the validation path runs before ctx.setSessionInfo, so ctx.getSessionInfo() is null there and the already-built sessionInfo is passed explicitly - the reason is the MQTT connect return code the client received (MqttConnectReturnCode.name(), e.g. CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID); the broker-side path keeps emitting the ClientSessionFailureReason name (SERVER_UNAVAILABLE / QUOTA_EXCEEDED / UNSPECIFIED_ERROR) - the emit is best-effort and gated on interested integrations, like the other lifecycle emits Tests: every shouldProceedWithConnection refusal test now asserts the event and its reason; a publisher test verifies the event is built from the passed sessionInfo when ctx has none.
…g config The IE executor shared the data stream's poll-interval, pack-processing-timeout and ack-strategy with the lifecycle-events stream, even though the Kafka topic/consumer/producer configs were already split per stream. The coupling matters for the ack-strategy: if the data stream is set to RETRY_ALL, events inherit it and a failing best-effort event gets retried and redelivered stale / out of order, and can head-of-line-block the events consumer. Split all three under queue.integration-msg, mirroring the topic split: - add event-poll-interval, event-pack-processing-timeout and event-ack-strategy (type/retries/pause-between-retries); event-ack-strategy defaults to SKIP_ALL so events stay best-effort independently of the data strategy - IntegrationEventAckStrategyConfiguration binds the new prefix (a distinct type so both configs inject unambiguously) - IntegrationAckStrategyFactory gains newEventInstance sharing a build() helper - IntegrationMsgProcessorImpl threads the per-stream poll interval, pack timeout and ack-strategy supplier into the shared processQueue loop Tests: IntegrationAckStrategyFactoryTest asserts newInstance follows the data config (RETRY_ALL -> reprocess) while newEventInstance follows the event config (SKIP_ALL -> commit); existing events processor tests still pass.
dmytro-landiak
left a comment
Contributor
There was a problem hiding this comment.
Re-review summary
Re-reviewed feat(integration): deliver client lifecycle events to integrations — verified 5 finding(s) from the previous review. The fix diff since then spans 16 commits.
| Status | Count |
|---|---|
| ✅ Resolved | 4 |
| 💬 Acknowledged | 1 |
All previous findings have been addressed. Also left 2 new low-severity design notes on the fix commits, inline.
Previous-finding details
- ✅ application/src/main/resources/thingsboard-mqtt-broker.yml — the events producer comment claimed the send could "never" stall an MQTT thread — Resolved: the comment now documents the real up-to-
max.block.msstall bound. - ✅ application/.../actors/client/service/handlers/MqttUnsubscribeHandler.java — shared-sub UNSUBSCRIBE emitted
$share/group/topicwhile CLIENT_SUBSCRIBED emitted the bare filter — Resolved: now emits the bare topic filter plusshareName, symmetric with CLIENT_SUBSCRIBED. - 💬 application/.../service/mqtt/validation/PublishMsgValidationServiceImpl.java — a lifecycle event fires on every denied publish with no dedup — Acknowledged: confirmed intentional (best-effort, bounded by the client's publish rate limit) and now documented in code.
- ✅ common/integration/.../integration/api/AbstractIntegration.java — the body builder switched on raw string literals that could drift from the enum — Resolved: now switches on the
ClientLifecycleEventTypeenum via a lenientfromName. - ✅ application/.../service/integration/IntegrationLifecycleEventPublisher.java —
actionwas a free-form String the callers hand-typed — Resolved: introduced theAuthorizationActionenum (publish/subscribe).
This re-review was auto-generated. Findings may contain errors — please verify before applying changes.
…TION_FAILED The broker-refusal path (refuseConnection) emitted the internal ClientSessionFailureReason name (SERVER_UNAVAILABLE/QUOTA_EXCEEDED/ UNSPECIFIED_ERROR), while the pre-connection validation path emitted the MqttConnectReturnCode name, so the same event field carried two vocabularies. Map the failure reason to its CONNACK return code (the code the client actually receives) and emit that, so both paths speak one vocabulary - consistent with CLIENT_DISCONNECTED carrying the MQTT reason-code name. The return code is now computed once and reused for the CONNACK send.
IntegrationAckStrategyConfiguration and IntegrationEventAckStrategyConfiguration were field-for-field identical. Move the shared type/retries/pauseBetweenRetries into an abstract IntegrationAckStrategyProperties base so a future field is declared once; the two concrete classes keep only their @component and @ConfigurationProperties prefix. The base carries no bean annotations, so both configs remain unambiguous to inject by type, and the factory is unchanged.
…AILED reason The enhanced-auth (SCRAM) failure path emitted the MQTT CONNACK return-code name into CLIENT_AUTHENTICATION_FAILED.reason, while the basic-auth path emits a descriptive reason. Emit enhancedAuthFailure().name() instead so the reason field describes why authentication failed on both paths rather than leaking a protocol return code.
The stats sink is not data-stream only: processEvents also creates and passes INTEGRATION_EVENT_PROCESSOR stats. Note the per-stream sink and distinct meter keys.
The lifecycle-events i18n block added for the client lifecycle events feature existed only in en_US. Add the full block (14 keys) to the de_DE, es_ES, zh_CN and hi_IN locales, mirroring the en_US placement, so the lifecycle-events UI is localized rather than falling back to English.
Carry the X.509 certificate Common Name as a common lifecycle-event field alongside username, populated from the session ctx in newSessionBuilder so it rides on every session-scoped event (CONNECTED/DISCONNECTED/SUBSCRIBED/ UNSUBSCRIBED/AUTHORIZATION_FAILED/CONNECTION_FAILED) when present. Emitted only for X.509-authenticated clients (omitted when empty, like username). New additive proto field; no UI change.
…unused record key The publisher passed PublishMsgCallback.EMPTY, so asynchronous Kafka send failures (the common failure mode) were logged but never incremented the dropped-event metric; only the rare synchronous throw was counted. Pass a reused real callback that increments droppedLifecycleEventStats on failure, and drop the redundant per-iteration try/catch (TbPublishServiceImpl already routes synchronous failures to the callback). Also stop generating a per-event UUID key: the event stream is single-partition (0) with delete-based retention, so the record key is unused for partition routing, compaction, and consumer-side dedup (the consumer assigns its own packet id). Use the no-key TbProtoQueueMsg constructor.
Inside the anonymous TbQueueCallback, this.getClass() resolved to the anonymous class (empty getSimpleName()), so ClientLogger emitted a blank location tag. Qualify with IntegrationMsgQueuePublisherImpl.this to log the enclosing publisher, matching IntegrationEventMsgQueuePublisher.
Enrich PublishMsgValidationServiceImplTest to verify the previously-unused IntegrationLifecycleEventPublisher mock: publishAuthorizationDenied is invoked once (with PUBLISH action + topic) when a publish is not authorized, and never when it is. Also migrate the deprecated @MockBean/@spybean to @MockitoBean/ @MockitoSpyBean.
The droppedLifecycleEvents constant is a pure micrometer counter name (unlike DROPPED_MSGS, which is also a persisted historical key bound to HISTORICAL_KEYS), so it belongs with the other stats meter names in StatsConstantNames rather than BrokerConstants. It is not referenced from common/data, so the move introduces no module cycle. Repoint both usages (StatsManagerImpl, DefaultDroppedLifecycleEventStats).
….integration-event Split the lifecycle-event stream's config out of the integration-msg namespace into its own queue.integration-event / queue.kafka.integration-event sections, dropping the redundant event- key prefixes so the keys mirror the integration-msg siblings. - add IntegrationEventKafkaSettings bound to queue.kafka.integration-event - drop event* fields from IntegrationMsgKafkaSettings and wire the new bean into KafkaIntegrationMsgQueueFactory - repoint IntegrationMsgProcessorImpl @value paths and the IntegrationEventAckStrategyConfiguration prefix to queue.integration-event - restructure the executor and broker yml files accordingly Env-var names are unchanged, so existing deployment overrides keep working.
… pack RetryStrategy built the reprocess map from the pending/failed maps, which are unordered (ConcurrentHashMap -> HashMap copies), so the retry pass dispatched messages in arbitrary order. The keys are UUID(packId, index), so sort by key and collect into a LinkedHashMap to restore the original consume/offset order. No change to IntegrationPackProcessingResult is needed since the ordering is recoverable from the keys. Only affects the RETRY_ALL ack strategy; both streams default to SKIP_ALL.
…rocessLifecycleEvent processLifecycleEvent pre-serialized the event to a String, so the HTTP integration handed a String to WebClient and it went out as text/plain instead of application/json (the message JSON path passes an ObjectNode and gets application/json). Pass the ObjectNode from constructLifecycleEventBody down to doProcessLifecycleEvent instead: HTTP forwards it directly (now application/json), while Kafka and MQTT serialize it with JacksonUtil.toString for the wire (byte-identical to before). Removed the now unused constructLifecycleEventValue. Note: doProcessLifecycleEvent/doSendBody signatures changed from String to ObjectNode.
… enabled MQTT lifecycle events have no originating message, so they are always published to the static topic/qos/retain regardless of the dynamic (use-message-*) options. But the UI disabled the static topic (and dropped its required validator) whenever dynamic topic was on, and neither the config validator nor the form knew about the events opt-in - so events could be configured against a blank topic and silently fail at runtime. - AbstractIntegration: add a doValidateLifecycleEventsDelivery hook (default no-op), invoked only when lifecycle events are opted in (additive - no change to the shared doValidateConfiguration signature) - MqttIntegration/MqttConfigValidator: require a valid static topic (non-empty, no wildcards, no leading $) for the events destination; runs on save via the executor VALIDATE task - mqtt-integration-form: enable + require the static topic (and reveal qos/retain) whenever lifecycle events are selected, independent of the use-message-* toggles (which govern message delivery only); add a localized hint (en/de/es/zh/hi) HTTP and Kafka deliver to a fixed destination, so no change is needed there.
…ents are enabled" This reverts commit 65451a4.
…LED lifecycle event The publisher already computes and sets protocolVersion on the auth-failed event proto, but constructLifecycleEventBody only emitted it for CLIENT_CONNECTED, so it never reached the JSON payload. Emit it for CLIENT_AUTHENTICATION_FAILED too (mirroring CLIENT_CONNECTED) - the protocol version is known by the time auth runs and is useful context for version-specific auth failures.
applyIntegrationLifecycleConfig was duplicated verbatim in InternodeNotificationsServiceImpl (broadcast-to-self) and InternodeNotificationsConsumerImpl (consume). Every other notification type delegates its handling to a single processor/manager so the two paths cannot drift. Follow that pattern: add processIntegrationLifecycleConfig(proto) to IntegrationLifecycleEventTypeCache, owning the delete/parse semantics in one place, and reduce both call sites to a one-line delegate. The moved logic is covered by IntegrationLifecycleEventTypeCacheImplTest against real behavior; the notification tests now assert the delegation.
…-cache index byIntegrationId is only ever touched inside the synchronized put/remove/rebuild path; reads go through the immutable volatile byEventType snapshot. A ConcurrentHashMap was doubly-defensive and obscured the design. Use a plain HashMap and document that synchronization (not the map type) provides thread-safety.
…t type cache The startup line reported the total integration count regardless of how many were opted in - '50 integrations' even when zero had lifecycle events enabled. Count the cached integrations and log 'cached N of M' so the line is diagnostic.
…mplete Render the Events topic name field in the add-integration wizard (MQTT only, when lifecycle events are selected), mirroring the edit view, and merge its value into clientConfiguration on save. Add topic autocomplete to the field in both the wizard and the MQTT config form. Note QoS 1 / retain off in the lifecycle events hint across all locales.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Pull Request description
Adds the ability for integrations to receive client lifecycle events, in addition to the regular published messages they already process. Until now an integration could only be fed MQTT
PUBLISHmessages matched by topic filters; this PR lets an integration also be notified when a client connects, disconnects, (un)subscribes, or fails authentication/authorization, and deliver those events to the external system over the existing HTTP / Kafka / MQTT integration types.Requested by the community in #259 and #127.
Event types
CLIENT_CONNECTEDcleanStart,keepAlive,protocolVersion,sessionExpiryIntervalCLIENT_DISCONNECTEDdisconnectReason(MQTT reason-code name, e.g.SESSION_TAKEN_OVER)CLIENT_SUBSCRIBEDsubscriptions[]— each withtopicFilter,qos,options(noLocal/retainAsPublish/retainHandling), plusshareNameandsubscriptionIdwhen presentCLIENT_UNSUBSCRIBEDsubscriptions[]— each withtopicFilter, plusshareNamewhen the removed subscription was sharedCLIENT_AUTHENTICATION_FAILEDreasonCLIENT_AUTHORIZATION_FAILEDaction(publish/subscribe),topicCLIENT_CONNECTION_FAILEDreasonEvery event also carries the common fields
eventType,clientId,username,clientCertCn,sessionId,ipAddress,ts,tbmqNode, and the integrationmetadata. String fields are omitted when empty (e.g.usernamewhen authentication is disabled,clientCertCnwhen the client did not authenticate with an X.509 certificate).Configuration / delivery modes
An integration's configuration now has two independent, optional opt-in lists:
topicFilters— data messages to deliver (existing behavior).lifecycleEventTypes— lifecycle event types to deliver (new).An integration may specify either or both, which yields three delivery modes: data-only (unchanged), events-only, and data + events. At least one of the two lists must be non-empty.
Design highlights
PublishIntegrationMsgProto, so the existing data path is untouched.event type → interested integration ids) is built at startup and kept in sync via internode notifications on integration save/delete, so the connect/disconnect/sub/unsub hot path does an O(1), allocation-free lookup and produces nothing when no integration is interested.max.block.mscaps the worst-case stall.Backward compatibility
Fully backward compatible — no upgrade script required:
Documentation
lifecycleEventTypesoption, the available event types, and the JSON payload for each.thingsboard-mqtt-broker.yml,queue.kafka.integration-msg.*event*) should be documented.General checklist
Front-End feature checklist
Back-End feature checklist