From ac44712d5044238c975c6acab73b575ded76e9d4 Mon Sep 17 00:00:00 2001 From: Andreas Becker Bertelsen Date: Thu, 13 Apr 2023 12:06:27 +0200 Subject: [PATCH 1/6] Add changes --- README.md | 9 ++++++++ .../pulsar/logstash/outputs/Pulsar.java | 23 ++++++++++++++++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index da01b43..711684d 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,15 @@ This plugin supports these configuration options. | ciphers | array, ciphers list | No | | enable_token | boolean, one of [true, false]. default is false | No | | auth_plugin_params_String | string | No | +| enable_async | boolean, one of [true, false]. default is false | No | + + +## Sync vs Async + +Sync is slower because it requires verification that a message is received. Sync supports exactly/effectively once messaging. This means there is a big network latency increase here. + +Async is faster because it sends messages out and doesn't care if the message was received/processed or not. Async should only be used if you want "at most once" messaging and don't care if messages are lost. + # Example pulsar without tls & token ``` diff --git a/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java index bc978dc..e2a61e7 100644 --- a/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java @@ -97,6 +97,9 @@ public class Pulsar implements Output { private static final PluginConfigSpec CONFIG_AUTH_PLUGIN_PARAMS_STRING = PluginConfigSpec.stringSetting("auth_plugin_params_String",""); + private static final PluginConfigSpec CONFIG_ENABLE_ASYNC = + PluginConfigSpec.booleanSetting("enable_async",false); + private final CountDownLatch done = new CountDownLatch(1); private final String producerName; @@ -125,6 +128,9 @@ public class Pulsar implements Output { //Token private final boolean enableToken; + // Sends messages async if set, otherwise sync + private final boolean enableSync; + // TODO: batchingMaxPublishDelay milliseconds // TODO: sendTimeoutMs milliseconds 30000 @@ -148,6 +154,7 @@ public Pulsar(final String id, final Configuration configuration, final Context enableTls = configuration.get(CONFIG_ENABLE_TLS); enableToken = configuration.get(CONFIG_ENABLE_TOKEN); + enableAsync = configuration.get(CONFIG_ENABLE_ASYNC); try { if(enableTls && enableToken){ @@ -222,15 +229,25 @@ public void output(final Collection events) { codec.encode(event, baos); String s = baos.toString(); logger.debug("topic is {}, message is {}", eventTopic, s); - getProducer(eventTopic).newMessage() - .value(s.getBytes()) - .sendAsync(); + send(eventTopic) } catch (Exception e) { logger.error("fail to send message", e); } } } + private void send(String topic){ + if (enable_async){ + getProducer(topic).newMessage() + .value(s.getBytes()) + .sendAsync(); + }else { + getProducer(topic).newMessage() + .value(s.getBytes()) + .send(); + } + } + private org.apache.pulsar.client.api.Producer getProducer(String topic) throws PulsarClientException { if(producerMap.containsKey(topic)){ From ef090c87b370628e5b0db55b0006cc5a3b16768c Mon Sep 17 00:00:00 2001 From: Andreas Becker Bertelsen Date: Thu, 8 Jun 2023 12:01:03 +0200 Subject: [PATCH 2/6] Update src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java Co-authored-by: Zike Yang --- src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java index e2a61e7..666d448 100644 --- a/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java @@ -129,7 +129,7 @@ public class Pulsar implements Output { private final boolean enableToken; // Sends messages async if set, otherwise sync - private final boolean enableSync; + private final boolean enableAsync; // TODO: batchingMaxPublishDelay milliseconds From 7b2f968c0fd39385203d7cd862a1c8cd0f77f02a Mon Sep 17 00:00:00 2001 From: Andreas Becker Bertelsen Date: Thu, 8 Jun 2023 12:01:08 +0200 Subject: [PATCH 3/6] Update src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java Co-authored-by: Zike Yang --- src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java index 666d448..f241b1f 100644 --- a/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java @@ -237,7 +237,7 @@ public void output(final Collection events) { } private void send(String topic){ - if (enable_async){ + if (enableAsync){ getProducer(topic).newMessage() .value(s.getBytes()) .sendAsync(); From 882ff0235289074fdcdf8bd0e83fbc587b2e7f5d Mon Sep 17 00:00:00 2001 From: Andreas Becker Bertelsen Date: Thu, 8 Jun 2023 12:01:58 +0200 Subject: [PATCH 4/6] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 711684d..440dd71 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ This plugin supports these configuration options. | ciphers | array, ciphers list | No | | enable_token | boolean, one of [true, false]. default is false | No | | auth_plugin_params_String | string | No | -| enable_async | boolean, one of [true, false]. default is false | No | +| enableAsync | boolean, one of [true, false]. default is false | No | ## Sync vs Async From 7c1bc75495cb8a8ead16d92f035533b59df93b57 Mon Sep 17 00:00:00 2001 From: Andreas Becker Bertelsen Date: Thu, 8 Jun 2023 12:03:33 +0200 Subject: [PATCH 5/6] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 440dd71..711684d 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ This plugin supports these configuration options. | ciphers | array, ciphers list | No | | enable_token | boolean, one of [true, false]. default is false | No | | auth_plugin_params_String | string | No | -| enableAsync | boolean, one of [true, false]. default is false | No | +| enable_async | boolean, one of [true, false]. default is false | No | ## Sync vs Async From 2fb1f80caa51dcd05b49e03723738295a148c80c Mon Sep 17 00:00:00 2001 From: Andreas Becker Bertelsen Date: Thu, 8 Jun 2023 12:08:00 +0200 Subject: [PATCH 6/6] Update send function to also include s as param to avoid runtime errors --- src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java index f241b1f..48f2291 100644 --- a/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java @@ -229,14 +229,14 @@ public void output(final Collection events) { codec.encode(event, baos); String s = baos.toString(); logger.debug("topic is {}, message is {}", eventTopic, s); - send(eventTopic) + send(eventTopic, s); } catch (Exception e) { logger.error("fail to send message", e); } } } - private void send(String topic){ + private void send(String topic, String s){ if (enableAsync){ getProducer(topic).newMessage() .value(s.getBytes())