Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
7b98449
feat: add lifecycle event protos for integration fan-out and internod…
deaflynx Jun 8, 2026
9508195
refactor: migrate IE queue layer from PublishIntegrationMsgProto to T…
deaflynx Jun 8, 2026
d8fcb5c
refactor: wrap published IE messages in TbIeMsgProto oneof wrapper
deaflynx Jun 8, 2026
5a99e1f
feat: add ClientLifecycleEventType enum
deaflynx Jun 8, 2026
6033f53
feat: add IntegrationLifecycleEventTypeCache for per-node event type …
deaflynx Jun 8, 2026
b1b31bb
feat: propagate integration lifecycle event config via internode noti…
deaflynx Jun 8, 2026
ef7d192
feat: broadcast lifecycle event config internode on integration save/…
deaflynx Jun 8, 2026
c66bfd8
feat: populate integration lifecycle event type cache at broker startup
deaflynx Jun 8, 2026
ae1ea05
feat: publish integration lifecycle events on connect, disconnect, su…
deaflynx Jun 8, 2026
8c04bad
feat: dispatch lifecycle event messages in integration executor
deaflynx Jun 8, 2026
d6600a7
feat: add ClientLifecycleEventType model and IntegrationLifecycleEven…
deaflynx Jun 8, 2026
b5d9742
feat: add i18n keys for integration lifecycle events
deaflynx Jun 8, 2026
64c5643
feat: add lifecycle events step to integration wizard
deaflynx Jun 8, 2026
8712cb4
feat: add lifecycle events control to integration edit view
deaflynx Jun 8, 2026
d1eca8d
fix: resolve lint errors in lifecycle events components
deaflynx Jun 8, 2026
9cbe761
fix: guard against unknown lifecycle event types in internode notific…
deaflynx Jun 8, 2026
72b3fee
fix: relocate integration lifecycle events into config card and patch…
deaflynx Jun 11, 2026
97a6461
refactor: use mat-select multiple for integration lifecycle event types
deaflynx Jun 11, 2026
b7c211e
feat: add CLIENT_UNSUBSCRIBED integration lifecycle event
deaflynx Jun 11, 2026
d1a35cf
feat: render lifecycle event types as wrapping chips in the select tr…
deaflynx Jun 11, 2026
3cd7565
Merge branch 'develop/2.4' into feat/mqtt-events-integration
dmytro-landiak Jun 24, 2026
ee57bc1
test: align unit tests with MQTT lifecycle-event integration deps
dmytro-landiak Jun 24, 2026
12ea055
refactor(integration): remove TbIeMsgProto oneof from integration.proto
dmytro-landiak Jun 24, 2026
26245d9
refactor(queue): revert IE data-path providers/factory to PublishInte…
dmytro-landiak Jun 24, 2026
ecd30ff
refactor(integration): revert broker IE data-path publisher to Publis…
dmytro-landiak Jun 24, 2026
3a1fb8e
refactor(integration): stub lifecycle-event publisher pending dedicat…
dmytro-landiak Jun 24, 2026
f87d557
refactor(integration): generify IntegrationPackProcessingContext/Resu…
dmytro-landiak Jun 24, 2026
12fa437
refactor(integration): generify IE backpressure strategies and revert…
dmytro-landiak Jun 24, 2026
a419e7f
feat(integration): add dedicated IE events topic and consumer-group n…
dmytro-landiak Jun 24, 2026
c3ce44b
feat(queue): add dedicated IE events producer/consumer to provider an…
dmytro-landiak Jun 24, 2026
b66acf7
feat(integration): add dedicated best-effort IE event msg queue publi…
dmytro-landiak Jun 24, 2026
59b8c47
feat(integration): publish lifecycle events on dedicated best-effort …
dmytro-landiak Jun 24, 2026
d149eb4
fix(stats): log and reset droppedLifecycleEventStats in printStats; a…
dmytro-landiak Jun 24, 2026
ff583c8
perf(integration): add reverse-index for O(1) allocation-free lifecyc…
dmytro-landiak Jun 24, 2026
76ac9ca
chore(config): add dedicated IE events topic and non-blocking events …
dmytro-landiak Jun 24, 2026
dd3d062
feat(integration): add IE-side lifecycle-events opt-in gate util
dmytro-landiak Jun 24, 2026
958d229
feat(integration): provision per-integration events topic + consumer …
dmytro-landiak Jun 24, 2026
7733729
feat(integration): consume per-integration lifecycle events on dedica…
dmytro-landiak Jun 24, 2026
9fcde85
feat(integration): isolate events-consumer start failures from data s…
dmytro-landiak Jun 24, 2026
2d792db
fix(integration): disable synchronous topic creation on IE events pro…
dmytro-landiak Jun 24, 2026
12e84d6
feat(integration): support events-only and messages+events delivery m…
dmytro-landiak Jun 25, 2026
9f24044
feat(integration): add auth/authz lifecycle event proto fields and en…
dmytro-landiak Jun 25, 2026
714dacf
feat(integration): stash authenticated username on session ctx + auth…
dmytro-landiak Jun 25, 2026
5fbf104
feat(integration): propagate username + enrich CLIENT_CONNECTED; publ…
dmytro-landiak Jun 25, 2026
f10112a
feat(integration): emit CLIENT_AUTHENTICATED on auth success and failure
dmytro-landiak Jun 25, 2026
666c5a4
feat(integration): emit CLIENT_AUTHORIZED on publish/subscribe denial…
dmytro-landiak Jun 25, 2026
e621961
feat(integration): build event body for username, auth, and authz lif…
dmytro-landiak Jun 25, 2026
b06a347
fix(integration): drop misleading authMethod from CLIENT_AUTHENTICATE…
dmytro-landiak Jun 25, 2026
5d5aef4
fix(integration): emit CLIENT_AUTHORIZED on every authz denial, drop …
dmytro-landiak Jun 25, 2026
3d684fd
fix(integration): skip CLIENT_AUTHORIZED emission when session not ye…
dmytro-landiak Jun 25, 2026
d96be97
feat(integration): add authentication/authorization lifecycle events …
dmytro-landiak Jun 25, 2026
c6f66df
refactor(session): tidy up ClientSessionCtx and extract EnhancedAuthS…
dmytro-landiak Jun 25, 2026
a006f42
fix(integration): refine topic filters & lifecycle events UI
deaflynx Jun 29, 2026
921d5d1
refactor(integration): emit auth lifecycle events on failure only
dmytro-landiak Jun 30, 2026
2aa350c
fix(integration): emit CLIENT_DISCONNECTED on cross-node session take…
dmytro-landiak Jun 30, 2026
636af2f
fix(integration): emit CLIENT_UNSUBSCRIBED only for actually-removed …
dmytro-landiak Jun 30, 2026
cbbe852
refactor(integration): reuse BytesUtil.toHostAddress for lifecycle IP…
dmytro-landiak Jun 30, 2026
690a572
refactor(integration): extract generic IE consume loop shared by data…
dmytro-landiak Jun 30, 2026
f257e8a
refactor(integration): centralize lifecycle event type key and parsing
dmytro-landiak Jun 30, 2026
4bacee9
docs(integration): document newSessionBuilder sessionInfo precondition
dmytro-landiak Jun 30, 2026
ef4a1ca
test(integration): cover subscribe-side lifecycle event emissions
dmytro-landiak Jun 30, 2026
1d7a4d6
docs(integration): correct IE event producer blocking-bound comment
dmytro-landiak Jun 30, 2026
94f2d5d
fix(integration): emit bare topic filter on shared-sub unsubscribe
dmytro-landiak Jun 30, 2026
276b3d7
refactor(integration): switch lifecycle event body builder on enum
dmytro-landiak Jun 30, 2026
dc2a860
refactor(integration): type-safe action for authorization-denied events
dmytro-landiak Jun 30, 2026
c7b2583
refactor(integration): drop clientInitiated from CLIENT_DISCONNECTED …
dmytro-landiak Jun 30, 2026
63ff640
refactor(integration): omit empty string fields from lifecycle event …
dmytro-landiak Jun 30, 2026
6d524b4
refactor(integration): use StringUtils.isNotEmpty in putIfNotEmpty
dmytro-landiak Jun 30, 2026
2f5b5ed
refactor(integration): drop anonymous from CLIENT_AUTHENTICATION_FAIL…
dmytro-landiak Jun 30, 2026
498464a
feat(integration): add CLIENT_CONNECTION_FAILED lifecycle event
dmytro-landiak Jun 30, 2026
d5c2ea9
fix(integration): don't emit phantom CLIENT_DISCONNECTED for refused …
dmytro-landiak Jun 30, 2026
63f0e94
refactor(integration): emit MQTT-standard reason code in CLIENT_DISCO…
dmytro-landiak Jun 30, 2026
c5accc8
feat(integration): include full subscription details in CLIENT_SUBSCR…
dmytro-landiak Jul 1, 2026
f6e7619
feat(integration): emit processing stats for the lifecycle-events stream
dmytro-landiak Jul 1, 2026
8dcd156
feat(integration): carry share name in CLIENT_UNSUBSCRIBED event
dmytro-landiak Jul 1, 2026
209ad33
feat(integration): emit CLIENT_CONNECTION_FAILED for pre-connection r…
dmytro-landiak Jul 1, 2026
231e3a0
feat(integration): give the lifecycle-events stream its own processin…
dmytro-landiak Jul 1, 2026
dcc2a2a
refactor(integration): emit MQTT CONNACK reason code in CLIENT_CONNEC…
dmytro-landiak Jul 1, 2026
fabcc22
refactor(integration): extract shared ack-strategy properties base
dmytro-landiak Jul 1, 2026
9be2649
fix(integration): carry auth-failure cause in CLIENT_AUTHENTICATION_F…
dmytro-landiak Jul 1, 2026
5ca59fa
docs(integration): correct processQueue stats-sink javadoc
dmytro-landiak Jul 1, 2026
5923bb0
feat(integration): localize lifecycle-events UI strings (de/es/zh/hi)
dmytro-landiak Jul 1, 2026
6cfdf19
feat(integration): add clientCertCn to client lifecycle events
dmytro-landiak Jul 1, 2026
7be4a2c
fix(integration): count async lifecycle-event send failures and drop …
dmytro-landiak Jul 1, 2026
3e99874
fix(integration): log enclosing publisher class in IE msg send callback
dmytro-landiak Jul 1, 2026
0df8785
test(integration): assert authorization-denied event on publish denial
dmytro-landiak Jul 1, 2026
3bfafec
refactor(stats): move DROPPED_LIFECYCLE_EVENTS to StatsConstantNames
dmytro-landiak Jul 1, 2026
6e6b9e2
refactor(integration): move lifecycle-event config to dedicated queue…
dmytro-landiak Jul 2, 2026
cc4acab
fix(integration): preserve offset order when reprocessing a RETRY_ALL…
dmytro-landiak Jul 2, 2026
0ab668d
refactor(integration): pass lifecycle event body as ObjectNode to doP…
dmytro-landiak Jul 2, 2026
65451a4
fix(integration): require static MQTT topic when lifecycle events are…
dmytro-landiak Jul 2, 2026
85ed9e6
Revert "fix(integration): require static MQTT topic when lifecycle ev…
dmytro-landiak Jul 2, 2026
95cd904
feat(integration): add dedicated events topic validation for MQTT int…
dmytro-landiak Jul 2, 2026
cbbef7e
feat(integration): require valid events topic when MQTT lifecycle eve…
dmytro-landiak Jul 2, 2026
d2b6cb7
feat(integration): publish MQTT lifecycle events to dedicated events …
dmytro-landiak Jul 2, 2026
2284179
feat(integration): add events topic field to MQTT integration form
dmytro-landiak Jul 2, 2026
245cf24
feat(integration): localize MQTT events topic field labels
dmytro-landiak Jul 2, 2026
c635c03
fix(integration): align MQTT events validation opt-in with runtime op…
dmytro-landiak Jul 2, 2026
555f77c
feat(integration): emit protocolVersion for CLIENT_AUTHENTICATION_FAI…
dmytro-landiak Jul 2, 2026
7e02553
refactor(integration): move lifecycle-config handling into the cache
dmytro-landiak Jul 2, 2026
ea25835
refactor(integration): use plain HashMap for the serialized lifecycle…
dmytro-landiak Jul 2, 2026
0c43fd8
fix(integration): log actual cached count when loading lifecycle even…
dmytro-landiak Jul 2, 2026
2cba5d4
feat(integration): expose events topic name in add wizard with autoco…
deaflynx Jul 2, 2026
99d4802
chore(integration): simplify lifecycle events hint
deaflynx Jul 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.thingsboard.mqtt.broker.service.security.authorization.AuthRulePatterns;
import org.thingsboard.mqtt.broker.session.ClientMqttActorManager;
import org.thingsboard.mqtt.broker.session.ClientSessionCtx;
import org.thingsboard.mqtt.broker.service.integration.IntegrationLifecycleEventPublisher;
import org.thingsboard.mqtt.broker.session.DisconnectReason;
import org.thingsboard.mqtt.broker.session.DisconnectReasonType;
import org.thingsboard.mqtt.broker.util.MqttReasonCodeResolver;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class ActorProcessorImpl implements ActorProcessor {
private final UnauthorizedClientManager unauthorizedClientManager;
private final BlockedClientService blockedClientService;
private final AuthorizationRoutingService authorizationRoutingService;
private final IntegrationLifecycleEventPublisher integrationLifecycleEventPublisher;

@Override
public void onInit(ClientActorState state, SessionInitMsg sessionInitMsg) {
Expand All @@ -96,11 +98,13 @@ public void onInit(ClientActorState state, SessionInitMsg sessionInitMsg) {
}

AuthContext authContext = buildAuthContext(state, sessionInitMsg);
sessionCtx.setUsername(authContext.getUsername());
AuthResponse authResponse = authorizationRoutingService.executeAuthFlow(authContext);

if (authResponse.notSuccess()) {
log.warn("[{}] Connection is not established due to: {}", state.getClientId(), CONNECTION_REFUSED_NOT_AUTHORIZED);
unauthorizedClientManager.persistClientUnauthorized(state, sessionInitMsg, authResponse.getReason());
integrationLifecycleEventPublisher.publishAuthenticationFailed(sessionCtx, state.getClientId(), authResponse.getReason());
sendConnectionRefusedNotAuthorizedMsgAndCloseChannel(sessionCtx);
return;
}
Expand Down Expand Up @@ -203,10 +207,14 @@ public void onEnhancedReAuth(ClientActorState state, MqttAuthMsg authMsg) {
private void processAuth(ClientActorState state, MqttAuthMsg authMsg, ClientSessionCtx sessionCtx) {
EnhancedAuthContext authContext = buildEnhancedAuthContext(state, authMsg);
EnhancedAuthFinalResponse authResponse = enhancedAuthenticationService.onAuthContinue(sessionCtx, authContext);
sessionCtx.setUsername(authResponse.username());
if (!authResponse.success()) {
resetStateToDisconnected(state);
MqttConnectReturnCode returnCode = getFailureReturnCode(authResponse);
unauthorizedClientManager.persistClientUnauthorized(state, sessionCtx, authResponse);
// Carry the enhanced-auth failure cause (e.g. AUTH_METHOD_MISMATCH) so CLIENT_AUTHENTICATION_FAILED.reason
// describes why authentication failed on both paths, rather than leaking the MQTT CONNACK return-code name here.
integrationLifecycleEventPublisher.publishAuthenticationFailed(sessionCtx, state.getClientId(), authResponse.enhancedAuthFailure().name());
sendConnectionRefusedMsgAndCloseChannel(sessionCtx, returnCode);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.thingsboard.mqtt.broker.actors.client.service;

import com.fasterxml.jackson.databind.JsonNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
Expand All @@ -24,11 +25,15 @@
import org.thingsboard.mqtt.broker.actors.client.service.session.ClientSessionService;
import org.thingsboard.mqtt.broker.actors.client.service.subscription.ClientSubscriptionService;
import org.thingsboard.mqtt.broker.common.data.ClientSessionInfo;
import org.thingsboard.mqtt.broker.common.data.integration.ClientLifecycleEventType;
import org.thingsboard.mqtt.broker.common.data.integration.ClientLifecycleEventTypeUtil;
import org.thingsboard.mqtt.broker.common.data.integration.Integration;
import org.thingsboard.mqtt.broker.common.data.subscription.TopicSubscription;
import org.thingsboard.mqtt.broker.config.ClientsLimitProperties;
import org.thingsboard.mqtt.broker.dao.integration.IntegrationService;
import org.thingsboard.mqtt.broker.exception.QueuePersistenceException;
import org.thingsboard.mqtt.broker.queue.cluster.ServiceInfoProvider;
import org.thingsboard.mqtt.broker.service.integration.IntegrationLifecycleEventTypeCache;
import org.thingsboard.mqtt.broker.service.limits.RateLimitService;
import org.thingsboard.mqtt.broker.service.mqtt.client.blocked.BlockedClientService;
import org.thingsboard.mqtt.broker.service.mqtt.client.blocked.consumer.BlockedClientConsumerService;
Expand All @@ -50,6 +55,7 @@
import org.thingsboard.mqtt.broker.service.subscription.data.SubscriptionsSourceKey;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -74,6 +80,7 @@ public class BrokerInitializer {
private final ServiceInfoProvider serviceInfoProvider;
private final RateLimitService rateLimitService;
private final IntegrationService integrationService;
private final IntegrationLifecycleEventTypeCache lifecycleEventTypeCache;
private final ClientsLimitProperties clientsLimitProperties;

private final ClientSessionEventConsumer clientSessionEventConsumer;
Expand All @@ -89,6 +96,7 @@ public class BrokerInitializer {
public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("Initializing Client Sessions and Subscriptions.");
try {
initIntegrationLifecycleEventCache();
Map<String, ClientSessionInfo> allClientSessions = initClientSessions();
initClientSubscriptions(allClientSessions);

Expand All @@ -109,6 +117,25 @@ public void onApplicationEvent(ApplicationReadyEvent event) {
}
}

void initIntegrationLifecycleEventCache() {
List<Integration> integrations = integrationService.findAllIntegrations();
int cached = 0;
for (Integration integration : integrations) {
JsonNode configuration = integration.getConfiguration();
if (configuration == null || !configuration.has(ClientLifecycleEventTypeUtil.LIFECYCLE_EVENT_TYPES_KEY)) {
continue;
}
Set<ClientLifecycleEventType> eventTypes = ClientLifecycleEventTypeUtil.parse(
configuration.get(ClientLifecycleEventTypeUtil.LIFECYCLE_EVENT_TYPES_KEY),
name -> log.warn("[{}] Unknown lifecycle event type: {}", integration.getId(), name));
if (!eventTypes.isEmpty()) {
lifecycleEventTypeCache.put(integration.getIdStr(), eventTypes);
cached++;
}
}
log.info("Loaded lifecycle event type cache: cached {} of {} integrations.", cached, integrations.size());
}

Map<String, ClientSessionInfo> initClientSessions() throws QueuePersistenceException {
Map<String, ClientSessionInfo> allClientSessions = clientSessionConsumer.initLoad();
log.info("Loaded {} stored client sessions from Kafka.", allClientSessions.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.thingsboard.mqtt.broker.service.mqtt.persistence.MsgPersistenceManager;
import org.thingsboard.mqtt.broker.service.mqtt.validation.PublishMsgValidationService;
import org.thingsboard.mqtt.broker.service.mqtt.will.LastWillService;
import org.thingsboard.mqtt.broker.service.integration.IntegrationLifecycleEventPublisher;
import org.thingsboard.mqtt.broker.service.stats.StatsManager;
import org.thingsboard.mqtt.broker.service.subscription.ClientSubscriptionCache;
import org.thingsboard.mqtt.broker.service.subscription.shared.TopicSharedSubscription;
Expand Down Expand Up @@ -94,6 +95,7 @@ public class ConnectServiceImpl implements ConnectService {
private final PublishMsgValidationService publishMsgValidationService;
private final MqttPublishMsgDeliveryService mqttPublishMsgDeliveryService;
private final StatsManager statsManager;
private final IntegrationLifecycleEventPublisher integrationLifecycleEventPublisher;

private ExecutorService connectHandlerExecutor;

Expand Down Expand Up @@ -209,6 +211,7 @@ public void acceptConnection(ClientActorStateInfo actorState, ConnectionAccepted
log.debug("[{}] [{}] Client connected!", actorState.getClientId(), actorState.getCurrentSessionId());

clientSessionCtxService.registerSession(sessionCtx);
integrationLifecycleEventPublisher.publishConnected(sessionCtx);

if (sessionCtx.getSessionInfo().isPersistent()) {
msgPersistenceManager.startProcessingPersistedMessages(actorState);
Expand Down Expand Up @@ -243,12 +246,16 @@ private void pushConnAckMsg(ClientActorStateInfo actorState, ConnectionAcceptedM
void refuseConnection(ClientSessionCtx clientSessionCtx, ClientSessionFailureReason reason, Throwable t) {
logConnectionRefused(clientSessionCtx, reason, t);

sendConnectionRefusedMsgAndDisconnect(clientSessionCtx, reason);
MqttConnectReturnCode returnCode = reason.toMqttReturnCode(clientSessionCtx);
// Emit the same MQTT CONNACK reason-code name the client receives, matching the pre-connection validation
// path (which emits MqttConnectReturnCode.name()) so CLIENT_CONNECTION_FAILED speaks a single vocabulary.
integrationLifecycleEventPublisher.publishConnectionFailed(clientSessionCtx, clientSessionCtx.getSessionInfo(), returnCode.name());

sendConnectionRefusedMsgAndDisconnect(clientSessionCtx, returnCode);
}

private void sendConnectionRefusedMsgAndDisconnect(ClientSessionCtx ctx, ClientSessionFailureReason reason) {
private void sendConnectionRefusedMsgAndDisconnect(ClientSessionCtx ctx, MqttConnectReturnCode mqttReturnCode) {
try {
MqttConnectReturnCode mqttReturnCode = reason.toMqttReturnCode(ctx);
createAndSendConnAckMsg(mqttReturnCode, ctx);
} catch (Exception e) {
log.warn("[{}][{}] Failed to send CONN_ACK response.", ctx.getClientId(), ctx.getSessionId());
Expand Down Expand Up @@ -295,6 +302,9 @@ boolean shouldProceedWithConnection(ClientActorStateInfo actorState, MqttConnect
validateLastWillMessage(ctx, clientId, msg);
} catch (ConnectionValidationException e) {
log.warn("[{}] Connection validation failed: {}", ctx.getSessionId(), e.getMessage());
// ctx.getSessionInfo() is not set yet at this point, so pass the already-built sessionInfo explicitly.
// reason is the MQTT connect return code the client received, e.g. CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID.
integrationLifecycleEventPublisher.publishConnectionFailed(ctx, sessionInfo, e.getMqttConnectReturnCode().name());
Comment thread
dmytro-landiak marked this conversation as resolved.
createAndSendConnAckMsg(e.getMqttConnectReturnCode(), ctx);
disconnect(clientId, ctx.getSessionId());
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.thingsboard.mqtt.broker.common.data.ClientInfo;
import org.thingsboard.mqtt.broker.common.data.SessionInfo;
import org.thingsboard.mqtt.broker.service.auth.AuthorizationRuleService;
import org.thingsboard.mqtt.broker.service.integration.IntegrationLifecycleEventPublisher;
import org.thingsboard.mqtt.broker.service.historical.stats.TbMessageStatsReportClient;
import org.thingsboard.mqtt.broker.service.limits.RateLimitService;
import org.thingsboard.mqtt.broker.service.mqtt.MqttMessageGenerator;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class DisconnectServiceImpl implements DisconnectService {
private final FlowControlService flowControlService;
private final TbMessageStatsReportClient tbMessageStatsReportClient;
private final ChannelBackpressureManager channelBackpressureManager;
private final IntegrationLifecycleEventPublisher integrationLifecycleEventPublisher;

@Override
public void disconnect(ClientActorStateInfo actorState, MqttDisconnectMsg disconnectMsg) {
Expand All @@ -85,6 +87,15 @@ public void disconnect(ClientActorStateInfo actorState, MqttDisconnectMsg discon
if (reasonType.isNotClusterConflictingSession()) {
notifyClientDisconnected(actorState, sessionExpiryInterval, reasonType);
}
// Emit the lifecycle CLIENT_DISCONNECTED on every disconnect, including cross-node session takeover
// (ON_CLUSTER_CONFLICTING_SESSIONS). The session-event notification above is intentionally suppressed
// on takeover, but the lifecycle event must still fire to pair with the CLIENT_CONNECTED this node emitted.
// Exception: a broker-refused connection (ON_CONNECTION_FAILURE) never established a session and never
// emitted CLIENT_CONNECTED, so its teardown must not emit a phantom CLIENT_DISCONNECTED; the dedicated
// CLIENT_CONNECTION_FAILED event covers that case instead.
if (reasonType != DisconnectReasonType.ON_CONNECTION_FAILURE) {
integrationLifecycleEventPublisher.publishDisconnected(sessionCtx, reasonType);
}
cleanupClientSession(actorState, disconnectMsg, sessionExpiryInterval);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.thingsboard.mqtt.broker.dao.topic.TopicValidationService;
import org.thingsboard.mqtt.broker.exception.DataValidationException;
import org.thingsboard.mqtt.broker.service.auth.AuthorizationRuleService;
import org.thingsboard.mqtt.broker.service.integration.AuthorizationAction;
import org.thingsboard.mqtt.broker.service.integration.IntegrationLifecycleEventPublisher;
import org.thingsboard.mqtt.broker.service.limits.RateLimitService;
import org.thingsboard.mqtt.broker.service.mqtt.MqttMessageGenerator;
import org.thingsboard.mqtt.broker.service.mqtt.MqttMsgDeliveryService;
Expand Down Expand Up @@ -74,6 +76,7 @@ public class MqttSubscribeHandler {
private final MsgPersistenceManager msgPersistenceManager;
private final ApplicationPersistenceProcessor applicationPersistenceProcessor;
private final RateLimitService rateLimitService;
private final IntegrationLifecycleEventPublisher integrationLifecycleEventPublisher;

public void process(ClientSessionCtx ctx, MqttSubscribeMsg msg) {
Set<TopicSharedSubscription> currentSharedSubscriptions = clientSubscriptionService.getClientSharedSubscriptions(ctx.getClientId());
Expand Down Expand Up @@ -132,6 +135,7 @@ List<MqttReasonCodes.SubAck> collectMqttReasonCodes(ClientSessionCtx ctx, MqttSu
if (!isClientAuthorized) {
log.warn("[{}][{}] Client is not authorized to subscribe to the topic {}",
ctx.getClientId(), ctx.getSessionId(), topic);
integrationLifecycleEventPublisher.publishAuthorizationDenied(ctx, AuthorizationAction.SUBSCRIBE, topic);
codes.add(MqttReasonCodeResolver.notAuthorizedSubscribe(ctx));
continue;
}
Expand Down Expand Up @@ -162,6 +166,7 @@ private void subscribeAndPersist(ClientSessionCtx ctx, List<TopicSubscription> n
CallbackUtil.createCallback(
() -> {
sendSubAck(ctx, subAckMessage);
integrationLifecycleEventPublisher.publishSubscribed(ctx, newSubscriptions);
processRetainedMessages(ctx, newSubscriptions, currentSubscriptions);
},
t -> log.warn("[{}][{}] Failed to process client subscription.", clientId, ctx.getSessionId(), t))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@
import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttUnsubscribeMsg;
import org.thingsboard.mqtt.broker.actors.client.service.subscription.ClientSubscriptionService;
import org.thingsboard.mqtt.broker.adaptor.NettyMqttConverter;
import org.thingsboard.mqtt.broker.common.data.subscription.TopicSubscription;
import org.thingsboard.mqtt.broker.common.data.util.CallbackUtil;
import org.thingsboard.mqtt.broker.service.integration.IntegrationLifecycleEventPublisher;
import org.thingsboard.mqtt.broker.service.mqtt.MqttMessageGenerator;
import org.thingsboard.mqtt.broker.service.mqtt.persistence.application.ApplicationPersistenceProcessor;
import org.thingsboard.mqtt.broker.service.subscription.shared.TopicSharedSubscription;
import org.thingsboard.mqtt.broker.session.ClientSessionCtx;
import org.thingsboard.mqtt.broker.util.MqttReasonCodeResolver;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -43,20 +48,59 @@ public class MqttUnsubscribeHandler {
private final MqttMessageGenerator mqttMessageGenerator;
private final ClientSubscriptionService clientSubscriptionService;
private final ApplicationPersistenceProcessor applicationPersistenceProcessor;
private final IntegrationLifecycleEventPublisher integrationLifecycleEventPublisher;

public void process(ClientSessionCtx ctx, MqttUnsubscribeMsg msg) {
log.trace("[{}][{}] Processing unsubscribe, messageId - {}, topic filters - {}", ctx.getClientId(), ctx.getSessionId(), msg.getMessageId(), msg.getTopics());

MqttMessage unSubAckMessage = mqttMessageGenerator.createUnSubAckMessage(msg.getMessageId(), getCodes(ctx, msg));
// MQTT allows UNSUBSCRIBE for filters the client never subscribed to. Emit CLIENT_UNSUBSCRIBED only for
// the subscriptions actually removed (symmetric with CLIENT_SUBSCRIBED, which emits only the granted ones).
List<TopicSubscription> removedSubscriptions = getRemovedSubscriptions(ctx.getClientId(), msg.getTopics());
clientSubscriptionService.unsubscribeAndPersist(ctx.getClientId(), msg.getTopics(),
CallbackUtil.createCallback(
() -> ctx.getChannel().writeAndFlush(unSubAckMessage),
() -> {
ctx.getChannel().writeAndFlush(unSubAckMessage);
if (!removedSubscriptions.isEmpty()) {
integrationLifecycleEventPublisher.publishUnsubscribed(ctx, removedSubscriptions);
}
},
t -> log.warn("[{}][{}] Failed to process client unsubscription", ctx.getClientId(), ctx.getSessionId(), t)
));

stopProcessingApplicationSharedSubscriptions(ctx, msg.getTopics());
}

private List<TopicSubscription> getRemovedSubscriptions(String clientId, List<String> requestedTopics) {
Set<TopicSubscription> currentSubscriptions = clientSubscriptionService.getClientSubscriptions(clientId);
if (CollectionUtils.isEmpty(currentSubscriptions)) {
return List.of();
}
// Index current subscriptions by (shareName, bare topic filter) so a requested $share/<group>/<filter>
// resolves to that exact shared subscription (carrying its shareName), and a bare filter to the regular one.
Map<SubKey, TopicSubscription> byKey = new HashMap<>();
for (TopicSubscription sub : currentSubscriptions) {
byKey.putIfAbsent(new SubKey(sub.getShareName(), sub.getTopicFilter()), sub);
}
List<TopicSubscription> removed = new ArrayList<>();
for (String requested : requestedTopics) {
String shareName = NettyMqttConverter.isSharedTopic(requested) ? NettyMqttConverter.getShareName(requested) : null;
TopicSubscription sub = byKey.get(new SubKey(shareName, toTopicFilter(requested)));
if (sub != null) {
removed.add(sub);
}
}
return removed;
}

// Composite lookup key; shareName is null for a regular subscription, the group name for a shared one.
private record SubKey(String shareName, String topicFilter) {
}

private static String toTopicFilter(String topic) {
return NettyMqttConverter.isSharedTopic(topic) ? NettyMqttConverter.getTopicFilter(topic) : topic;
}

private List<UnsubAck> getCodes(ClientSessionCtx ctx, MqttUnsubscribeMsg msg) {
return msg
.getTopics()
Expand Down
Loading