Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unbuffered string error #2

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

Dominick-Peluso-Bose
Copy link

@Dominick-Peluso-Bose Dominick-Peluso-Bose commented Jun 23, 2022

This repo seems really handy but I had some trouble getting it running in its default configuration, so I wanted to share what I did to get it running.

Changes:

  • Upgrade to Node 18
  • Fixes an issue with the producer sending unbuffered strings
  • Added a log line to show the message contents

Before this change:

kafka-producer_1  | @hitmands/[email protected] / Producer.Ready
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | /kafka-producer-stub/node_modules/node-rdkafka/lib/producer.js:139
kafka-producer_1  |     this._client.produce(topic, partition, message, key, timestamp, opaque, headers));
kafka-producer_1  |                  ^
kafka-producer_1  | 
kafka-producer_1  | Error: Message must be a buffer or null
kafka-producer_1  |     at Producer.produce (/kafka-producer-stub/node_modules/node-rdkafka/lib/producer.js:139:18)
kafka-producer_1  |     at Object.dispatch (file:///kafka-producer-stub/lib/dispatcher.mjs:9:21)
kafka-producer_1  |     at Producer.<anonymous> (file:///kafka-producer-stub/lib/producer.mjs:26:20)
kafka-producer_1  | 
kafka-producer_1  | Node.js v18.4.0

After this change:

Recreating kafka-producer-stub-orig_kafka-producer_1 ... done
Attaching to kafka-producer-stub-orig_kafka-producer_1
kafka-producer_1  | @hitmands/[email protected] / Producer.CreateInstance {
kafka-producer_1  |   'client.id': '@hitmands/[email protected]',
kafka-producer_1  |   'metadata.broker.list': 'kafka:19092'
kafka-producer_1  | }
kafka-producer_1  | @hitmands/[email protected] / Producer.Ready
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":1,"hash":"674a59c015f8f","timestamp":1655989809364}
kafka-producer_1  | (node:1) [DEP0005] DeprecationWarning: Buffer() is deprecated due to security and usability issues. Please use the Buffer.alloc(), Buffer.allocUnsafe(), or Buffer.from() methods instead.
kafka-producer_1  | (Use `node --trace-deprecation ...` to show where the warning was created)
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":2,"hash":"dcf2349225f55","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":3,"hash":"e5379ebd1f5c","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":4,"hash":"68662ca34734a","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":5,"hash":"afbeae45deb5d","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":6,"hash":"cc963c169dba6","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":7,"hash":"31ca9ea5c47aa","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":8,"hash":"043bebf2ce6fc","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":9,"hash":"d2df33ce5cb8c","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":10,"hash":"d919a01ffa26c","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":11,"hash":"1fa6ec28c0f45","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":12,"hash":"fffc407286de3","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":13,"hash":"412e5342e91e1","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":14,"hash":"85cdf4763f707","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":15,"hash":"23d655ce6b215","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":16,"hash":"45cabfc7a29af","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":17,"hash":"2395968b129f3","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":18,"hash":"339730ab90558","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":19,"hash":"04e72629cb1","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":20,"hash":"c68505e3ea11b","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":21,"hash":"c3125002e0259","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":22,"hash":"cc49c193bd2d1","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":23,"hash":"c1e1299d757f2","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":24,"hash":"346518739bed3","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":25,"hash":"5dcaa9509dc4e","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":26,"hash":"0fbabccc2a725","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":27,"hash":"8a9199b1c34a2","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":28,"hash":"b8137a7a703b6","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":29,"hash":"0d1a447f317db","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | message {"id":30,"hash":"50a8efcb43169","timestamp":1655989809364}
kafka-producer_1  | @hitmands/[email protected] / Producer.Done
kafka-producer-stub-orig_kafka-producer_1 exited with code 0

@Dominick-Peluso-Bose Dominick-Peluso-Bose changed the title Get running Fix unbuffered string error Jun 23, 2022
@@ -23,7 +23,7 @@ export const start = async ({ clientId, messenger }) => {
console.log(`${clientId} / Producer.Ready`);

for await (const { topic, message, options } of messenger()) {
dispatcher.dispatch(topic, message, options);
dispatcher.dispatch(topic, Buffer(JSON.stringify(message)), options);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By design, we wanted to just dispatch the message as is.
The string was supposed to be buffered in the message producer, as listed here:

Could you elaborate a bit more on why you would want to change this behaviour?

image

Copy link
Author

@Dominick-Peluso-Bose Dominick-Peluso-Bose Jul 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Well I made this change because the code that's in main (and also what was in Docker hub from the "Using the docker-compose of this repo" commands) don't seem to do what's described there, which results in this error when you run the containers:

kafka-producer_1  | @hitmands/[email protected] / Producer.Ready
kafka-producer_1  | @hitmands/[email protected] / Producer.Dispatcher.Dispatch('stub-topic')
kafka-producer_1  | /kafka-producer-stub/node_modules/node-rdkafka/lib/producer.js:139
kafka-producer_1  |     this._client.produce(topic, partition, message, key, timestamp, opaque, headers));
kafka-producer_1  |                  ^
kafka-producer_1  | 
kafka-producer_1  | Error: Message must be a buffer or null
kafka-producer_1  |     at Producer.produce (/kafka-producer-stub/node_modules/node-rdkafka/lib/producer.js:139:18)
kafka-producer_1  |     at Object.dispatch (file:///kafka-producer-stub/lib/dispatcher.mjs:9:21)
kafka-producer_1  |     at Producer.<anonymous> (file:///kafka-producer-stub/lib/producer.mjs:26:20)
kafka-producer_1  | 
kafka-producer_1  | Node.js v18.4.0

I see you are already aware of the fix and did so in a different place but maybe it just didn't make its way back to main. No worries.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a reasonable change to add, messages must be buffers, hence it makes sense to toBuffer internally.

I'd suggest we'd use Buffer.from as factory to create a buffer object out of a string.

@@ -1,4 +1,4 @@
FROM node:14-alpine
FROM node:18-alpine
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳

@@ -2,6 +2,7 @@ version: "3.9"

services:
kafka-producer:
# build: .
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we'd avoid commented out code :)

@@ -5,6 +5,7 @@ export const createDispatcher = (clientId, producer) => ({
{ partition = null, key = null, ts = Date.now() } = {}
) => {
console.log(`${clientId} / Producer.Dispatcher.Dispatch('${topic}')`);
console.log("message", message.toString());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might lead to a very verbose log, we should put it under a debug condition or something.

if (LOG_LEVEL === 'debug') {
  // do verbose logging here
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants