Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unbuffered string error #2

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM node:14-alpine
FROM node:18-alpine
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳

MAINTAINER Giuseppe Mandato <[email protected]>

ENV PYTHONUNBUFFERED=1
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ version: "3.9"

services:
kafka-producer:
# build: .
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we'd avoid commented out code :)

image: hitmands/kafka-producer-stub:latest
environment:
HKPS_BROKERS: "kafka:19092"
Expand Down
1 change: 1 addition & 0 deletions lib/dispatcher.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export const createDispatcher = (clientId, producer) => ({
{ partition = null, key = null, ts = Date.now() } = {}
) => {
console.log(`${clientId} / Producer.Dispatcher.Dispatch('${topic}')`);
console.log("message", message.toString());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might lead to a very verbose log, we should put it under a debug condition or something.

if (LOG_LEVEL === 'debug') {
  // do verbose logging here
}


return producer.produce(topic, partition, message, null, ts);
},
Expand Down
2 changes: 1 addition & 1 deletion lib/producer.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export const start = async ({ clientId, messenger }) => {
console.log(`${clientId} / Producer.Ready`);

for await (const { topic, message, options } of messenger()) {
dispatcher.dispatch(topic, message, options);
dispatcher.dispatch(topic, Buffer(JSON.stringify(message)), options);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By design, we wanted to just dispatch the message as is.
The string was supposed to be buffered in the message producer, as listed here:

Could you elaborate a bit more on why you would want to change this behaviour?

image

Copy link
Author

@Dominick-Peluso-Bose Dominick-Peluso-Bose Jul 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Well I made this change because the code that's in main (and also what was in Docker hub from the "Using the docker-compose of this repo" commands) don't seem to do what's described there, which results in this error when you run the containers:

kafka-producer_1  | @hitmands/[email protected] / Producer.Ready
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | /kafka-producer-stub/node_modules/node-rdkafka/lib/producer.js:139
kafka-producer_1  |     this._client.produce(topic, partition, message, key, timestamp, opaque, headers));
kafka-producer_1  |                  ^
kafka-producer_1  | 
kafka-producer_1  | Error: Message must be a buffer or null
kafka-producer_1  |     at Producer.produce (/kafka-producer-stub/node_modules/node-rdkafka/lib/producer.js:139:18)
kafka-producer_1  |     at Object.dispatch (file:///kafka-producer-stub/lib/dispatcher.mjs:9:21)
kafka-producer_1  |     at Producer.<anonymous> (file:///kafka-producer-stub/lib/producer.mjs:26:20)
kafka-producer_1  | 
kafka-producer_1  | Node.js v18.4.0

I see you are already aware of the fix and did so in a different place but maybe it just didn't make its way back to main. No worries.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a reasonable change to add, messages must be buffers, hence it makes sense to toBuffer internally.

I'd suggest we'd use Buffer.from as factory to create a buffer object out of a string.

}

console.log(`${clientId} / Producer.Done`);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
},
"dependencies": {
"faker": "^5.4.0",
"node-rdkafka": "^2.10.1",
"node-rdkafka": "^2.13.0",
"ramda": "^0.27.1"
}
}
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ nan@^2.14.0:
resolved "https://registry.yarnpkg.com/nan/-/nan-2.14.2.tgz#f5376400695168f4cc694ac9393d0c9585eeea19"
integrity sha512-M2ufzIiINKCuDfBSAUr1vWQ+vuVcA9kqx8JJUsbQi6yf1uGRyb7HfpdfUr5qLXf3B/t8dPvcjhKMmlfnP47EzQ==

node-rdkafka@^2.10.1:
version "2.10.1"
resolved "https://registry.yarnpkg.com/node-rdkafka/-/node-rdkafka-2.10.1.tgz#de6055a8aed6315a68f5b401415e7d6b9bfcc631"
integrity sha512-yRb9Y90ipef4X+S/UbvQedUNtKZONa9RR6hCpAaGD83NqUga/uxTofdRQG8bm7SEh/DNuaifIJjRzLcoG9nUSQ==
node-rdkafka@^2.13.0:
version "2.13.0"
resolved "https://registry.yarnpkg.com/node-rdkafka/-/node-rdkafka-2.13.0.tgz#393b5db9dd40b19df8eb8b85a160402bd8046150"
integrity sha512-CGdURr+gapPxDT5AuWS5vyXh341vUXYQxyX5Ly3sJaSxUDnG+cE4ZCAcE6NZ79AUo7O6S3UH6c5bFRdPdTWpaw==
dependencies:
bindings "^1.3.1"
nan "^2.14.0"
Expand Down