Skip to content

Commit ac44712

Browse files
committed
Add changes
1 parent d391f5b commit ac44712

File tree

2 files changed

+29
-3
lines changed

2 files changed

+29
-3
lines changed

README.md

+9
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,15 @@ This plugin supports these configuration options.
3131
| ciphers | array, ciphers list | No |
3232
| enable_token | boolean, one of [true, false]. default is false | No |
3333
| auth_plugin_params_String | string | No |
34+
| enable_async | boolean, one of [true, false]. default is false | No |
35+
36+
37+
## Sync vs Async
38+
39+
Sync is slower because it requires verification that a message is received. Sync supports exactly/effectively once messaging. This means there is a big network latency increase here.
40+
41+
Async is faster because it sends messages out and doesn't care if the message was received/processed or not. Async should only be used if you want "at most once" messaging and don't care if messages are lost.
42+
3443
# Example
3544
pulsar without tls & token
3645
```

src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ public class Pulsar implements Output {
9797
private static final PluginConfigSpec<String> CONFIG_AUTH_PLUGIN_PARAMS_STRING =
9898
PluginConfigSpec.stringSetting("auth_plugin_params_String","");
9999

100+
private static final PluginConfigSpec<Boolean> CONFIG_ENABLE_ASYNC =
101+
PluginConfigSpec.booleanSetting("enable_async",false);
102+
100103
private final CountDownLatch done = new CountDownLatch(1);
101104

102105
private final String producerName;
@@ -125,6 +128,9 @@ public class Pulsar implements Output {
125128
//Token
126129
private final boolean enableToken;
127130

131+
// Sends messages async if set, otherwise sync
132+
private final boolean enableSync;
133+
128134
// TODO: batchingMaxPublishDelay milliseconds
129135

130136
// TODO: sendTimeoutMs milliseconds 30000
@@ -148,6 +154,7 @@ public Pulsar(final String id, final Configuration configuration, final Context
148154

149155
enableTls = configuration.get(CONFIG_ENABLE_TLS);
150156
enableToken = configuration.get(CONFIG_ENABLE_TOKEN);
157+
enableAsync = configuration.get(CONFIG_ENABLE_ASYNC);
151158

152159
try {
153160
if(enableTls && enableToken){
@@ -222,15 +229,25 @@ public void output(final Collection<Event> events) {
222229
codec.encode(event, baos);
223230
String s = baos.toString();
224231
logger.debug("topic is {}, message is {}", eventTopic, s);
225-
getProducer(eventTopic).newMessage()
226-
.value(s.getBytes())
227-
.sendAsync();
232+
send(eventTopic)
228233
} catch (Exception e) {
229234
logger.error("fail to send message", e);
230235
}
231236
}
232237
}
233238

239+
private void send(String topic){
240+
if (enable_async){
241+
getProducer(topic).newMessage()
242+
.value(s.getBytes())
243+
.sendAsync();
244+
}else {
245+
getProducer(topic).newMessage()
246+
.value(s.getBytes())
247+
.send();
248+
}
249+
}
250+
234251
private org.apache.pulsar.client.api.Producer<byte[]> getProducer(String topic) throws PulsarClientException {
235252

236253
if(producerMap.containsKey(topic)){

0 commit comments

Comments
 (0)