@@ -38,6 +38,7 @@ DECLARE_LOG_OBJECT()
38
38
39
39
ConsumerImpl::ConsumerImpl (const ClientImplPtr client, const std::string& topic,
40
40
const std::string& subscriptionName, const ConsumerConfiguration& conf,
41
+ bool isPersistent,
41
42
const ExecutorServicePtr listenerExecutor /* = NULL by default */ ,
42
43
bool hasParent /* = false by default */ ,
43
44
const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */ ,
@@ -47,6 +48,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
47
48
config_(conf),
48
49
subscription_(subscriptionName),
49
50
originalSubscriptionName_(subscriptionName),
51
+ isPersistent_(isPersistent),
50
52
messageListener_(config_.getMessageListener()),
51
53
eventListener_(config_.getConsumerEventListener()),
52
54
hasParent_(hasParent),
@@ -169,14 +171,17 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
169
171
// sending the subscribe request.
170
172
cnx->registerConsumer (consumerId_, shared_from_this ());
171
173
172
- Lock lockForMessageId (mutexForMessageId_);
173
- Optional<MessageId> firstMessageInQueue = clearReceiveQueue ();
174
- if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) {
175
- // Update startMessageId so that we can discard messages after delivery
176
- // restarts
177
- startMessageId_ = firstMessageInQueue;
174
+ if (duringSeek_) {
175
+ ackGroupingTrackerPtr_->flushAndClean ();
178
176
}
179
- const auto startMessageId = startMessageId_;
177
+
178
+ Lock lockForMessageId (mutexForMessageId_);
179
+ // Update startMessageId so that we can discard messages after delivery restarts
180
+ const auto startMessageId = clearReceiveQueue ();
181
+ const auto subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable)
182
+ ? startMessageId
183
+ : Optional<MessageId>::empty ();
184
+ startMessageId_ = startMessageId;
180
185
lockForMessageId.unlock ();
181
186
182
187
unAckedMessageTrackerPtr_->clear ();
@@ -186,7 +191,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
186
191
uint64_t requestId = client->newRequestId ();
187
192
SharedBuffer cmd = Commands::newSubscribe (
188
193
topic_, subscription_, consumerId_, requestId, getSubType (), consumerName_, subscriptionMode_,
189
- startMessageId , readCompacted_, config_.getProperties (), config_.getSubscriptionProperties (),
194
+ subscribeMessageId , readCompacted_, config_.getProperties (), config_.getSubscriptionProperties (),
190
195
config_.getSchema (), getInitialPosition (), config_.isReplicateSubscriptionStateEnabled (),
191
196
config_.getKeySharedPolicy (), config_.getPriorityLevel ());
192
197
cnx->sendRequestWithId (cmd, requestId)
@@ -397,12 +402,12 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
397
402
return ;
398
403
}
399
404
400
- const bool isMessageDecryptable =
401
- metadata.encryption_keys_size () <= 0 || config_.getCryptoKeyReader ().get () ||
405
+ const bool isMessageUndecryptable =
406
+ metadata.encryption_keys_size () > 0 && ! config_.getCryptoKeyReader ().get () &&
402
407
config_.getCryptoFailureAction () == ConsumerCryptoFailureAction::CONSUME;
403
408
404
409
const bool isChunkedMessage = metadata.num_chunks_from_msg () > 1 ;
405
- if (isMessageDecryptable && !isChunkedMessage) {
410
+ if (!isMessageUndecryptable && !isChunkedMessage) {
406
411
if (!uncompressMessageIfNeeded (cnx, msg.message_id (), metadata, payload, true )) {
407
412
// Message was discarded on decompression error
408
413
return ;
@@ -446,6 +451,16 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
446
451
Lock lock (mutex_);
447
452
numOfMessageReceived = receiveIndividualMessagesFromBatch (cnx, m, msg.redelivery_count ());
448
453
} else {
454
+ const auto startMessageId = startMessageId_.get ();
455
+ if (isPersistent_ && startMessageId.is_present () &&
456
+ m.getMessageId ().ledgerId () == startMessageId.value ().ledgerId () &&
457
+ m.getMessageId ().entryId () == startMessageId.value ().entryId () &&
458
+ isPriorEntryIndex (m.getMessageId ().entryId ())) {
459
+ LOG_DEBUG (getName () << " Ignoring message from before the startMessageId: "
460
+ << startMessageId.value ());
461
+ return ;
462
+ }
463
+
449
464
Lock lock (pendingReceiveMutex_);
450
465
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
451
466
bool asyncReceivedWaiting = !pendingReceives_.empty ();
@@ -533,9 +548,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
533
548
batchAcknowledgementTracker_.receivedMessage (batchedMessage);
534
549
LOG_DEBUG (" Received Batch messages of size - " << batchSize
535
550
<< " -- msgId: " << batchedMessage.getMessageId ());
536
- Lock lock (mutexForMessageId_);
537
- const auto startMessageId = startMessageId_;
538
- lock.unlock ();
551
+ const auto startMessageId = startMessageId_.get ();
539
552
540
553
int skippedMessages = 0 ;
541
554
@@ -550,9 +563,9 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
550
563
551
564
// If we are receiving a batch message, we need to discard messages that were prior
552
565
// to the startMessageId
553
- if (msgId.ledgerId () == startMessageId.value ().ledgerId () &&
566
+ if (isPersistent_ && msgId.ledgerId () == startMessageId.value ().ledgerId () &&
554
567
msgId.entryId () == startMessageId.value ().entryId () &&
555
- msgId.batchIndex () <= startMessageId. value (). batchIndex ( )) {
568
+ isPriorBatchIndex ( msgId.batchIndex ())) {
556
569
LOG_DEBUG (getName () << " Ignoring message from before the startMessageId"
557
570
<< msg.getMessageId ());
558
571
++skippedMessages;
@@ -842,6 +855,12 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
842
855
* not seen by the application
843
856
*/
844
857
Optional<MessageId> ConsumerImpl::clearReceiveQueue () {
858
+ bool expectedDuringSeek = true ;
859
+ if (duringSeek_.compare_exchange_strong (expectedDuringSeek, false )) {
860
+ return Optional<MessageId>::of (seekMessageId_.get ());
861
+ } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
862
+ return startMessageId_.get ();
863
+ }
845
864
Message nextMessageInQueue;
846
865
if (incomingMessages_.peekAndClear (nextMessageInQueue)) {
847
866
// There was at least one message pending in the queue
@@ -862,7 +881,7 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
862
881
} else {
863
882
// No message was received or dequeued by this consumer. Next message would still be the
864
883
// startMessageId
865
- return startMessageId_;
884
+ return startMessageId_. get () ;
866
885
}
867
886
}
868
887
@@ -1175,18 +1194,6 @@ void ConsumerImpl::brokerConsumerStatsListener(Result res, BrokerConsumerStatsIm
1175
1194
}
1176
1195
}
1177
1196
1178
- void ConsumerImpl::handleSeek (Result result, ResultCallback callback) {
1179
- if (result == ResultOk) {
1180
- Lock lock (mutexForMessageId_);
1181
- lastDequedMessageId_ = MessageId::earliest ();
1182
- lock.unlock ();
1183
- LOG_INFO (getName () << " Seek successfully" );
1184
- } else {
1185
- LOG_ERROR (getName () << " Failed to seek: " << strResult (result));
1186
- }
1187
- callback (result);
1188
- }
1189
-
1190
1197
void ConsumerImpl::seekAsync (const MessageId& msgId, ResultCallback callback) {
1191
1198
const auto state = state_.load ();
1192
1199
if (state == Closed || state == Closing) {
@@ -1197,25 +1204,13 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
1197
1204
return ;
1198
1205
}
1199
1206
1200
- this ->ackGroupingTrackerPtr_ ->flushAndClean ();
1201
- ClientConnectionPtr cnx = getCnx ().lock ();
1202
- if (cnx) {
1203
- ClientImplPtr client = client_.lock ();
1204
- uint64_t requestId = client->newRequestId ();
1205
- LOG_DEBUG (getName () << " Sending seek Command for Consumer - " << getConsumerId () << " , requestId - "
1206
- << requestId);
1207
- Future<Result, ResponseData> future =
1208
- cnx->sendRequestWithId (Commands::newSeek (consumerId_, requestId, msgId), requestId);
1209
-
1210
- if (callback) {
1211
- future.addListener (
1212
- std::bind (&ConsumerImpl::handleSeek, shared_from_this (), std::placeholders::_1, callback));
1213
- }
1207
+ ClientImplPtr client = client_.lock ();
1208
+ if (!client) {
1209
+ LOG_ERROR (getName () << " Client is expired when seekAsync " << msgId);
1214
1210
return ;
1215
1211
}
1216
-
1217
- LOG_ERROR (getName () << " Client Connection not ready for Consumer" );
1218
- callback (ResultNotConnected);
1212
+ const auto requestId = client->newRequestId ();
1213
+ seekAsyncInternal (requestId, Commands::newSeek (consumerId_, requestId, msgId), msgId, 0L , callback);
1219
1214
}
1220
1215
1221
1216
void ConsumerImpl::seekAsync (uint64_t timestamp, ResultCallback callback) {
@@ -1228,24 +1223,14 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
1228
1223
return ;
1229
1224
}
1230
1225
1231
- ClientConnectionPtr cnx = getCnx ().lock ();
1232
- if (cnx) {
1233
- ClientImplPtr client = client_.lock ();
1234
- uint64_t requestId = client->newRequestId ();
1235
- LOG_DEBUG (getName () << " Sending seek Command for Consumer - " << getConsumerId () << " , requestId - "
1236
- << requestId);
1237
- Future<Result, ResponseData> future =
1238
- cnx->sendRequestWithId (Commands::newSeek (consumerId_, requestId, timestamp), requestId);
1239
-
1240
- if (callback) {
1241
- future.addListener (
1242
- std::bind (&ConsumerImpl::handleSeek, shared_from_this (), std::placeholders::_1, callback));
1243
- }
1226
+ ClientImplPtr client = client_.lock ();
1227
+ if (!client) {
1228
+ LOG_ERROR (getName () << " Client is expired when seekAsync " << timestamp);
1244
1229
return ;
1245
1230
}
1246
-
1247
- LOG_ERROR ( getName () << " Client Connection not ready for Consumer " );
1248
- callback (ResultNotConnected );
1231
+ const auto requestId = client-> newRequestId ();
1232
+ seekAsyncInternal (requestId, Commands::newSeek (consumerId_, requestId, timestamp), MessageId::earliest (),
1233
+ timestamp, callback);
1249
1234
}
1250
1235
1251
1236
bool ConsumerImpl::isReadCompacted () { return readCompacted_; }
@@ -1255,9 +1240,10 @@ inline bool hasMoreMessages(const MessageId& lastMessageIdInBroker, const Messag
1255
1240
}
1256
1241
1257
1242
void ConsumerImpl::hasMessageAvailableAsync (HasMessageAvailableCallback callback) {
1243
+ const auto startMessageId = startMessageId_.get ();
1258
1244
Lock lock (mutexForMessageId_);
1259
1245
const auto messageId =
1260
- (lastDequedMessageId_ == MessageId::earliest ()) ? startMessageId_ .value () : lastDequedMessageId_;
1246
+ (lastDequedMessageId_ == MessageId::earliest ()) ? startMessageId .value () : lastDequedMessageId_;
1261
1247
1262
1248
if (messageId == MessageId::latest ()) {
1263
1249
lock.unlock ();
@@ -1380,4 +1366,57 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ ==
1380
1366
1381
1367
uint64_t ConsumerImpl::getNumberOfConnectedConsumer () { return isConnected () ? 1 : 0 ; }
1382
1368
1369
+ void ConsumerImpl::seekAsyncInternal (long requestId, SharedBuffer seek, const MessageId& seekId,
1370
+ long timestamp, ResultCallback callback) {
1371
+ ClientConnectionPtr cnx = getCnx ().lock ();
1372
+ if (!cnx) {
1373
+ LOG_ERROR (getName () << " Client Connection not ready for Consumer" );
1374
+ callback (ResultNotConnected);
1375
+ return ;
1376
+ }
1377
+
1378
+ const auto originalSeekMessageId = seekMessageId_.get ();
1379
+ seekMessageId_ = seekId;
1380
+ duringSeek_ = true ;
1381
+ if (timestamp > 0 ) {
1382
+ LOG_INFO (getName () << " Seeking subscription to " << timestamp);
1383
+ } else {
1384
+ LOG_INFO (getName () << " Seeking subscription to " << seekId);
1385
+ }
1386
+
1387
+ std::weak_ptr<ConsumerImpl> weakSelf{shared_from_this ()};
1388
+
1389
+ cnx->sendRequestWithId (seek, requestId)
1390
+ .addListener ([this , weakSelf, callback, originalSeekMessageId](Result result,
1391
+ const ResponseData& responseData) {
1392
+ auto self = weakSelf.lock ();
1393
+ if (!self) {
1394
+ callback (result);
1395
+ return ;
1396
+ }
1397
+ if (result == ResultOk) {
1398
+ LOG_INFO (getName () << " Seek successfully" );
1399
+ ackGroupingTrackerPtr_->flushAndClean ();
1400
+ Lock lock (mutexForMessageId_);
1401
+ lastDequedMessageId_ = MessageId::earliest ();
1402
+ lock.unlock ();
1403
+ } else {
1404
+ LOG_ERROR (getName () << " Failed to seek: " << result);
1405
+ seekMessageId_ = originalSeekMessageId;
1406
+ duringSeek_ = false ;
1407
+ }
1408
+ callback (result);
1409
+ });
1410
+ }
1411
+
1412
+ bool ConsumerImpl::isPriorBatchIndex (int32_t idx) {
1413
+ return config_.isStartMessageIdInclusive () ? idx < startMessageId_.get ().value ().batchIndex ()
1414
+ : idx <= startMessageId_.get ().value ().batchIndex ();
1415
+ }
1416
+
1417
+ bool ConsumerImpl::isPriorEntryIndex (int64_t idx) {
1418
+ return config_.isStartMessageIdInclusive () ? idx < startMessageId_.get ().value ().entryId ()
1419
+ : idx <= startMessageId_.get ().value ().entryId ();
1420
+ }
1421
+
1383
1422
} /* namespace pulsar */
0 commit comments