Skip to content

Commit 7f7653b

Browse files
authored
[feat] Consumer support batch receive messages. (#21)
### Motivation apache/pulsar#17140 This PR has been reviewed in [pulsar repo](apache/pulsar#17429). ### Modifications - Consumer support batch receives messages. - Abstract common implementation to `ConsumerImplBase`.
1 parent c1a9808 commit 7f7653b

22 files changed

+1037
-149
lines changed

include/pulsar/BatchReceivePolicy.h

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#ifndef BATCH_RECEIVE_POLICY_HPP_
20+
#define BATCH_RECEIVE_POLICY_HPP_
21+
22+
#include <pulsar/defines.h>
23+
#include <memory>
24+
25+
namespace pulsar {
26+
27+
struct BatchReceivePolicyImpl;
28+
29+
/**
30+
* Configuration for message batch receive {@link Consumer#batchReceive()} {@link
31+
* Consumer#batchReceiveAsync()}.
32+
*
33+
* <p>Batch receive policy can limit the number and bytes of messages in a single batch, and can specify a
34+
* timeout for waiting for enough messages for this batch.
35+
*
36+
* <p>A batch receive action is completed as long as any one of the
37+
* conditions (the batch has enough number or size of messages, or the waiting timeout is passed) are met.
38+
*
39+
* <p>Examples:
40+
* 1.If set maxNumMessages = 10, maxSizeOfMessages = 1MB and without timeout, it
41+
* means {@link Consumer#batchReceive()} will always wait until there is enough messages.
42+
* 2.If set maxNumberOfMessages = 0, maxNumBytes = 0 and timeout = 100ms, it
43+
* means {@link Consumer#batchReceive()} will wait for 100ms no matter whether there are enough messages.
44+
*
45+
* <p>Note:
46+
* Must specify messages limitation(maxNumMessages, maxNumBytes) or wait timeout.
47+
* Otherwise, {@link Messages} ingest {@link Message} will never end.
48+
*
49+
* @since 2.4.1
50+
*/
51+
class PULSAR_PUBLIC BatchReceivePolicy {
52+
public:
53+
/**
54+
* Default value: {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}
55+
*/
56+
BatchReceivePolicy();
57+
58+
/**
59+
*
60+
* @param maxNumMessage Max num message, if less than 0, it means no limit.
61+
* @param maxNumBytes Max num bytes, if less than 0, it means no limit.
62+
* @param timeoutMs If less than 0, it means no limit.
63+
*/
64+
BatchReceivePolicy(int maxNumMessage, long maxNumBytes, long timeoutMs);
65+
66+
/**
67+
* Get max time out ms.
68+
*
69+
* @return
70+
*/
71+
long getTimeoutMs() const;
72+
73+
/**
74+
* Get the maximum number of messages.
75+
* @return
76+
*/
77+
int getMaxNumMessages() const;
78+
79+
/**
80+
* Get max num bytes.
81+
* @return
82+
*/
83+
long getMaxNumBytes() const;
84+
85+
private:
86+
std::shared_ptr<BatchReceivePolicyImpl> impl_;
87+
};
88+
} // namespace pulsar
89+
90+
#endif /* BATCH_RECEIVE_POLICY_HPP_ */

include/pulsar/Consumer.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,31 @@ class PULSAR_PUBLIC Consumer {
113113
*/
114114
void receiveAsync(ReceiveCallback callback);
115115

116+
/**
117+
* Batch receiving messages.
118+
*
119+
* <p>This calls blocks until has enough messages or wait timeout, more details to see {@link
120+
* BatchReceivePolicy}.
121+
*
122+
* @param msgs a non-const reference where the received messages will be copied
123+
* @return ResultOk when a message is received
124+
* @return ResultInvalidConfiguration if a message listener had been set in the configuration
125+
*/
126+
Result batchReceive(Messages& msgs);
127+
128+
/**
129+
* Async Batch receiving messages.
130+
* <p>
131+
* Retrieves a message when it will be available and completes callback with received message.
132+
* </p>
133+
* <p>
134+
* batchReceiveAsync() should be called subsequently once callback gets completed with received message.
135+
* Else it creates <i> backlog of receive requests </i> in the application.
136+
* </p>
137+
* @param BatchReceiveCallback will be completed when messages are available.
138+
*/
139+
void batchReceiveAsync(BatchReceiveCallback callback);
140+
116141
/**
117142
* Acknowledge the reception of a single message.
118143
*

include/pulsar/ConsumerConfiguration.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,18 @@
3131
#include <pulsar/InitialPosition.h>
3232
#include <pulsar/KeySharedPolicy.h>
3333
#include <pulsar/ConsumerEventListener.h>
34+
#include "BatchReceivePolicy.h"
3435

3536
namespace pulsar {
3637

3738
class Consumer;
3839
class PulsarWrapper;
3940

4041
/// Callback definition for non-data operation
42+
typedef std::vector<Message> Messages;
4143
typedef std::function<void(Result result)> ResultCallback;
4244
typedef std::function<void(Result, const Message& msg)> ReceiveCallback;
45+
typedef std::function<void(Result, const Messages& msgs)> BatchReceiveCallback;
4346
typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;
4447

4548
/// Callback definition for MessageListener
@@ -378,6 +381,21 @@ class PULSAR_PUBLIC ConsumerConfiguration {
378381
*/
379382
InitialPosition getSubscriptionInitialPosition() const;
380383

384+
/**
385+
* Set batch receive policy.
386+
*
387+
* @param batchReceivePolicy the default is
388+
* {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}
389+
*/
390+
void setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy);
391+
392+
/**
393+
* Get batch receive policy.
394+
*
395+
* @return batch receive policy
396+
*/
397+
const BatchReceivePolicy& getBatchReceivePolicy() const;
398+
381399
/**
382400
* Set whether the subscription status should be replicated.
383401
* The default value is `false`.

lib/BatchReceivePolicy.cc

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <pulsar/BatchReceivePolicy.h>
21+
#include "BatchReceivePolicyImpl.h"
22+
#include "LogUtils.h"
23+
24+
using namespace pulsar;
25+
26+
namespace pulsar {
27+
28+
DECLARE_LOG_OBJECT()
29+
30+
BatchReceivePolicy::BatchReceivePolicy() : BatchReceivePolicy(-1, 10 * 1024 * 1024, 100) {}
31+
32+
BatchReceivePolicy::BatchReceivePolicy(int maxNumMessage, long maxNumBytes, long timeoutMs)
33+
: impl_(std::make_shared<BatchReceivePolicyImpl>()) {
34+
if (maxNumMessage <= 0 && maxNumBytes <= 0 && timeoutMs <= 0) {
35+
throw std::invalid_argument(
36+
"At least one of maxNumMessages, maxNumBytes and timeoutMs must be specified.");
37+
}
38+
if (maxNumMessage <= 0 && maxNumBytes <= 0) {
39+
impl_->maxNumMessage = -1;
40+
impl_->maxNumBytes = 10 * 1024 * 1024;
41+
LOG_WARN(
42+
"BatchReceivePolicy maxNumMessages and maxNumBytes is less than 0. Reset to default: "
43+
"maxNumMessage(-1), maxNumBytes(10 * 1024 * 10)");
44+
} else {
45+
impl_->maxNumMessage = maxNumMessage;
46+
impl_->maxNumBytes = maxNumBytes;
47+
}
48+
impl_->timeoutMs = timeoutMs;
49+
}
50+
51+
long BatchReceivePolicy::getTimeoutMs() const { return impl_->timeoutMs; }
52+
53+
int BatchReceivePolicy::getMaxNumMessages() const { return impl_->maxNumMessage; }
54+
55+
long BatchReceivePolicy::getMaxNumBytes() const { return impl_->maxNumBytes; }
56+
57+
} // namespace pulsar

lib/BatchReceivePolicyImpl.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
namespace pulsar {
22+
23+
struct BatchReceivePolicyImpl {
24+
int maxNumMessage;
25+
long maxNumBytes;
26+
long timeoutMs;
27+
};
28+
29+
} // namespace pulsar

lib/ClientImpl.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,12 @@
2828
#include <mutex>
2929
#include <lib/TopicName.h>
3030
#include "ProducerImplBase.h"
31-
#include "ConsumerImplBase.h"
3231
#include <atomic>
3332
#include <vector>
3433
#include "ServiceNameResolver.h"
3534

3635
namespace pulsar {
3736

38-
class ClientImpl;
3937
class PulsarFriend;
4038
typedef std::shared_ptr<ClientImpl> ClientImplPtr;
4139
typedef std::weak_ptr<ClientImpl> ClientImplWeakPtr;
@@ -44,6 +42,9 @@ class ReaderImpl;
4442
typedef std::shared_ptr<ReaderImpl> ReaderImplPtr;
4543
typedef std::weak_ptr<ReaderImpl> ReaderImplWeakPtr;
4644

45+
class ConsumerImplBase;
46+
typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
47+
4748
std::string generateRandomName();
4849

4950
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {

lib/Consumer.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,24 @@ void Consumer::receiveAsync(ReceiveCallback callback) {
8282
impl_->receiveAsync(callback);
8383
}
8484

85+
Result Consumer::batchReceive(Messages& msgs) {
86+
if (!impl_) {
87+
return ResultConsumerNotInitialized;
88+
}
89+
Promise<Result, Messages> promise;
90+
impl_->batchReceiveAsync(WaitForCallbackValue<Messages>(promise));
91+
return promise.getFuture().get(msgs);
92+
}
93+
94+
void Consumer::batchReceiveAsync(BatchReceiveCallback callback) {
95+
if (!impl_) {
96+
Messages msgs;
97+
callback(ResultConsumerNotInitialized, msgs);
98+
return;
99+
}
100+
impl_->batchReceiveAsync(callback);
101+
}
102+
85103
Result Consumer::acknowledge(const Message& message) { return acknowledge(message.getMessageId()); }
86104

87105
Result Consumer::acknowledge(const MessageId& messageId) {

lib/ConsumerConfiguration.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <lib/ConsumerConfigurationImpl.h>
2020

2121
#include <stdexcept>
22+
#include <pulsar/ConsumerConfiguration.h>
2223

2324
namespace pulsar {
2425

@@ -267,4 +268,12 @@ ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool st
267268

268269
bool ConsumerConfiguration::isStartMessageIdInclusive() const { return impl_->startMessageIdInclusive; }
269270

271+
void ConsumerConfiguration::setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy) {
272+
impl_->batchReceivePolicy = batchReceivePolicy;
273+
}
274+
275+
const BatchReceivePolicy& ConsumerConfiguration::getBatchReceivePolicy() const {
276+
return impl_->batchReceivePolicy;
277+
}
278+
270279
} // namespace pulsar

lib/ConsumerConfigurationImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ struct ConsumerConfigurationImpl {
4545
ConsumerCryptoFailureAction cryptoFailureAction{ConsumerCryptoFailureAction::FAIL};
4646
bool readCompacted{false};
4747
InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest};
48+
BatchReceivePolicy batchReceivePolicy{};
4849
int patternAutoDiscoveryPeriod{60};
4950
bool replicateSubscriptionStateEnabled{false};
5051
std::map<std::string, std::string> properties;

0 commit comments

Comments
 (0)