diff --git a/README.md b/README.md index da01b43..711684d 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,15 @@ This plugin supports these configuration options. | ciphers | array, ciphers list | No | | enable_token | boolean, one of [true, false]. default is false | No | | auth_plugin_params_String | string | No | +| enable_async | boolean, one of [true, false]. default is false | No | + + +## Sync vs Async + +Sync is slower because it requires verification that a message is received. Sync supports exactly/effectively once messaging. This means there is a big network latency increase here. + +Async is faster because it sends messages out and doesn't care if the message was received/processed or not. Async should only be used if you want "at most once" messaging and don't care if messages are lost. + # Example pulsar without tls & token ``` diff --git a/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java index bc978dc..48f2291 100644 --- a/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java @@ -97,6 +97,9 @@ public class Pulsar implements Output { private static final PluginConfigSpec CONFIG_AUTH_PLUGIN_PARAMS_STRING = PluginConfigSpec.stringSetting("auth_plugin_params_String",""); + private static final PluginConfigSpec CONFIG_ENABLE_ASYNC = + PluginConfigSpec.booleanSetting("enable_async",false); + private final CountDownLatch done = new CountDownLatch(1); private final String producerName; @@ -125,6 +128,9 @@ public class Pulsar implements Output { //Token private final boolean enableToken; + // Sends messages async if set, otherwise sync + private final boolean enableAsync; + // TODO: batchingMaxPublishDelay milliseconds // TODO: sendTimeoutMs milliseconds 30000 @@ -148,6 +154,7 @@ public Pulsar(final String id, final Configuration configuration, final Context enableTls = configuration.get(CONFIG_ENABLE_TLS); enableToken = configuration.get(CONFIG_ENABLE_TOKEN); + enableAsync = configuration.get(CONFIG_ENABLE_ASYNC); try { if(enableTls && enableToken){ @@ -222,15 +229,25 @@ public void output(final Collection events) { codec.encode(event, baos); String s = baos.toString(); logger.debug("topic is {}, message is {}", eventTopic, s); - getProducer(eventTopic).newMessage() - .value(s.getBytes()) - .sendAsync(); + send(eventTopic, s); } catch (Exception e) { logger.error("fail to send message", e); } } } + private void send(String topic, String s){ + if (enableAsync){ + getProducer(topic).newMessage() + .value(s.getBytes()) + .sendAsync(); + }else { + getProducer(topic).newMessage() + .value(s.getBytes()) + .send(); + } + } + private org.apache.pulsar.client.api.Producer getProducer(String topic) throws PulsarClientException { if(producerMap.containsKey(topic)){