Skip to content

Commit 2e57a0a

Browse files
committed
Tweaked to include LISTENERS config more clearly
1 parent 102bac4 commit 2e57a0a

File tree

2 files changed

+8
-283
lines changed

2 files changed

+8
-283
lines changed

README.md

+1-254
Original file line numberDiff line numberDiff line change
@@ -1,256 +1,3 @@
11
# Kafka Listeners - Explained
22

3-
This question comes up on StackOverflow and such places a **lot**, so here's something to try and help.
4-
5-
**tl;dr** : You need to set `advertised.listeners` (or `KAFKA_ADVERTISED_LISTENERS` if you're using Docker images) to the external address (host/IP) so that clients can correctly connect to it. Otherwise they'll try to connect to the internal host address–and if that's not reachable then problems ensue.
6-
7-
## Is anyone listening?
8-
9-
Kafka is a distributed system. Data is read from & written to the _Leader_ for a given partition, which could be on any of the brokers in a cluster. When a client (producer/consumer) starts, it will request metadata about which broker is the leader for a partition—and it can do this from _any_ broker. The metadata returned will include the endpoints available for the Leader broker for that partition, and the client will then use those endpoints to connect to the broker to read/write data as required.
10-
11-
It's these endpoints that cause people trouble. On a *single machine, running 'bare metal'* (no VMs, no Docker), everything might be the hostname (or just *`localhost`*) and it's easy. But once you move into more complex networking setups, and multiple nodes, you have to pay more attention to it.
12-
13-
Let's assume you have more than one network. This could be things like:
14-
15-
- Docker internal network(s) plus host machine
16-
- Brokers in the cloud (eg. AWS EC2), and on-premises machines locally (or even in another cloud)
17-
18-
You need to tell Kafka how the brokers can reach each other, but also make sure that external clients (producers/consumers) can reach the broker they need to.
19-
20-
The key thing is that when you run a client, **the broker you pass to it is _just where it's going to go and get the metadata about brokers in the cluster from_**. The actual host & IP that it will connect to for reading/writing data is based on **_the data that the broker passes back in that initial connection_**—even if it's just a single node and the broker returned is the same as the one connected to.
21-
22-
For configuring this correctly, you need to understand that Kafka brokers can have multiple _listeners_. A listener is a combination of
23-
24-
1. Host/IP
25-
2. Port
26-
3. Protocol
27-
28-
Let's check out some config. Often the protocol is used for the listener name too, but here let's make it nice and clear by using abstract names for the listeners:
29-
30-
KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
31-
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
32-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
33-
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
34-
35-
_I'm using the Docker config names—the equivalents if you're configuring `server.properties` directly (e.g. on AWS etc) are shown indented in the following list_
36-
37-
* `KAFKA_LISTENERS` is a comma-separated list of listeners, and the host/ip and port to which Kafka binds to on which to listen. For more complex networking this might be an IP address associated with a given network interface on a machine. The default is 0.0.0.0, which means listening on all interfaces.
38-
* `listeners`
39-
* `KAFKA_ADVERTISED_LISTENERS` is a comma-separated list of listeners with their the host/ip and port. This is the metadata that's passed back to clients.
40-
* `advertised.listeners`
41-
* `KAFKA_LISTENER_SECURITY_PROTOCOL_MAP` defines key/value pairs for the security protocol to use, per listener name.
42-
* `listener.security.protocol.map`
43-
44-
*Kafka brokers communicate between themselves*, usually on the internal network (e.g. Docker network, AWS VPC, etc). To define which listener to use, specify `KAFKA_INTER_BROKER_LISTENER_NAME` (`inter.broker.listener.name`). The host/IP used must be accessible from the broker machine to others.
45-
46-
Kafka _clients_ may well not be local to the broker's network, and this is where the additional listeners come in.
47-
48-
Each listener will, when connected to, report back the address on which it can be reached. _The address on which you reach a broker depends on the network used_. If you're connecting to the broker from an internal network it's going to be a different host/IP than when connecting externally.
49-
50-
When connecting to a broker, the listener that will be returned to the client will be the listener to which you connected (based on the port).
51-
52-
`kafkacat` is a useful tool for exploring this. Using `-L` you can see the metadata for the listener to which you connected. Based on the same listener config as above (`LISTENER_BOB` / `LISTENER_FRED`): -
53-
54-
* Connecting on port 9092 (which we map as `LISTENER_FRED`), the broker's address is given back as `localhost`
55-
56-
$ kafkacat -b kafka0:9092 \
57-
-L
58-
Metadata for all topics (from broker -1: kafka0:9092/bootstrap):
59-
1 brokers:
60-
broker 0 at localhost:9092
61-
62-
63-
* Connecting on port 29092 (which we map as `LISTENER_BOB`), the broker's address is given back as `kafka0`:
64-
65-
$ kafkacat -b kafka0:29092 \
66-
-L
67-
Metadata for all topics (from broker 0: kafka0:29092/0):
68-
1 brokers:
69-
broker 0 at kafka0:29092
70-
71-
72-
You can also use `tcpdump` to examine the traffic from a client connecting to the broker, and spot the hostname that's returned from the broker.
73-
74-
75-
## Why can I connect to the broker, but the client still fails?
76-
77-
*tl;dr* Even if you can make the initial connection to the broker, the address returned in the metadata may still be for a hostname that is not accessible from your client.
78-
79-
Let's walk this through step by step.
80-
81-
1. We've got a broker on AWS. We want to send a message to it from our laptop. We know the external hostname for the EC2 instance (`ec2-54-191-84-122.us-west-2.compute.amazonaws.com`). We've created the necessary entry in the security group to open the broker's port to our inbound traffic. We do smart things like checking that our local machine can connect to the port on the AWS instance:
82-
83-
$ nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092
84-
found 0 associations
85-
found 1 connections:
86-
1: flags=82<CONNECTED,PREFERRED>
87-
outif utun5
88-
src 172.27.230.23 port 53352
89-
dst 54.191.84.122 port 9092
90-
rank info not available
91-
TCP aux info available
92-
93-
Connection to ec2-54-191-84-122.us-west-2.compute.amazonaws.com port 9092 [tcp/XmlIpcRegSvc] succeeded!
94-
95-
Things are looking good! We run:
96-
97-
echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
98-
99-
Now…what happens next?
100-
101-
1. Our laptop resolves `ec2-54-191-84-122.us-west-2.compute.amazonaws.com` successfully (to the IP address 54.191.84.122), and connects to the AWS machine on port 9092
102-
103-
2. The broker receives the inbound connection on port 9092. *It returns the metadata to the client, with the hostname `ip-172-31-18-160.us-west-2.compute.internal`* because this is the host name of the broker and the default value for `listeners`.
104-
105-
3. The client the tries to send data to the broker using the metadata it was given. Since `ip-172-31-18-160.us-west-2.compute.internal` is not resolvable from the internet, it fails.
106-
107-
$ echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
108-
>>[2018-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
109-
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time
110-
111-
4. Puzzled, we try the same thing from the broker machine itself:
112-
113-
ec2-user@ip-172-31-18-160 ~> echo "foo"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
114-
>>
115-
ec2-user@ip-172-31-18-160 ~>ec2-user@ip-172-31-18-160 ~> kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning
116-
foo
117-
118-
It works fine! That's because regardless of the host/IP we connect to, we are connecting to port 9092, which is configured as the _internal_ listener, and thus reports back its hostname as `ip-172-31-18-160.us-west-2.compute.internal` which _is_ resolvable from the broker machine (it's its own hostname!)
119-
120-
1. We can make life even easier by using [`kafkacat`](https://docs.confluent.io/current/app-development/kafkacat-usage.html). Using the `-L` flag we can see the metadata returned by the broker:
121-
122-
$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L
123-
Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
124-
1 brokers:
125-
broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092
126-
127-
Clear as day, the _internal_ hostname is returned. This also makes this seemingly-confusing error make a lot more sense—connecting to one hostname, getting a lookup error on another:
128-
129-
$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test
130-
% ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known
131-
132-
Here we're using `kafkacat` in producer mode (`-C`) from our local machine to try and read from the topic. As before, because we're getting the _internal_ listener hostname back from the broker in the metadata, the client cannot resolve that hostname to read/write from.
133-
134-
135-
136-
## I saw a StackOverflow answer suggesting to just update my hosts file…isn't that easier?
137-
138-
This is nothing more than a hack to workaround a mis-configuration, instead of actually fixing it.
139-
140-
If the broker is reporting back a hostname to which the client cannot connect, then hardcoding the hostname/IP combo into the local `/etc/hosts` may seem a nice fix. But this is a very brittle and manual solution. What happens when the IP changes, when you move hosts and forget to take the little hack with you, when other people want to do the same?
141-
142-
Much better is to understand and actually fix the `advertised.listeners` setting for your network.
143-
144-
## Docker example
145-
146-
![images/docker01.png](/content/images/2018/08/docker01.png)
147-
148-
Run within Docker, you will need to configure two listeners for Kafka:
149-
150-
1. Communication _within the Docker network_. This could be inter-broker communication (i.e. between brokers), and between other components running in Docker such as Kafka Connect, or third-party clients or producers.
151-
152-
For these comms, we need to use _the hostname of the Docker container(s)_. Each Docker container on the same Docker network will use the hostname of the Kafka broker container to reach it
153-
154-
2. Non-Docker network traffic. This could be clients running local on the Docker host machine, for example. The assumption is that they will connect on `localhost`, to a port exposed from the Docker container.
155-
156-
Here's the docker-compose snippet:
157-
158-
kafka0:
159-
image: "confluentinc/cp-enterprise-kafka:5.0.0-rc3"
160-
ports:
161-
- '9092:9092'
162-
depends_on:
163-
- zookeeper
164-
environment:
165-
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
166-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
167-
[…]
168-
169-
* Clients _within_ the Docker network connect using listener "BOB", with port 29092 and hostname `kafka0`. In doing so, they get back the hostname `kafka0` to which to connect. Each docker container will resolve `kafka0` using Docker's internal network, and be able to reach the broker.
170-
* Clients _external_ to the Docker network connect using listener "FRED", with port 9092 and hostname `localhost`. Port 9092 is exposed by the Docker container and so available to connect to. When clients connect, they are given the hostname `localhost` for the broker's metadata, and so connect to this when reading/writing data.
171-
* The above configuration would _not_ handle the scenario in which a client external to Docker _and_ external to the host machine wants to connect. This is because neither `kafka0` (the internal Docker hostname) _or_ `localhost` (the loopback address for the Docker host machine) would be resolvable.
172-
173-
## AWS/IaaS example
174-
175-
_I'm naming AWS because it's what the majority of people use, but this applies to any IaaS/Cloud solution._
176-
177-
Exactly the same concepts apply here as with Docker. The main difference is that whilst with Docker the external connections may well be just on localhost (as above), with Cloud-hosted Kafka (such as on AWS) the external connection will be from a machine not local to to the broker and which will need to be able to connect to the broker.
178-
179-
A further complication is that whilst Docker networks are heavily segregated from the host's, on IaaS often the _external_ hostname is resolvable _internally_, making it hit and miss when you may actually encounter these problems.
180-
181-
There are two approaches, depending on whether the external address through which you're going to connect to the broker is also resolvable locally to all of the brokers on the network (e.g VPC).
182-
183-
### Option 1 - external address IS resolvable locally
184-
185-
![images/aws01.png](/content/images/2018/08/aws01-1.png)
186-
187-
You can get by with one listener here. The existing listener, called `PLAINTEXT`, just needs overriding to set the advertised hostname (i.e. the one that is passed to inbound clients)
188-
189-
advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
190-
191-
Now connections both internally and externally will use `ec2-54-191-84-122.us-west-2.compute.amazonaws.com` for connecting. Because `ec2-54-191-84-122.us-west-2.compute.amazonaws.com` can be resolved both locally and externally, things work fine.
192-
193-
### Option 2 - external address is NOT resolvable locally
194-
195-
You will need to configure two listeners for Kafka:
196-
197-
1. Communication _within the AWS network (VPC)_. This could be inter-broker communication (i.e. between brokers), and between other components running in the VPC such as Kafka Connect, or third-party clients or producers.
198-
199-
For these comms, we need to use _the internal IP of the EC2 machine_ (or hostname, if DNS is configured).
200-
201-
2. External AWS traffic. This could be testing connectivity from a laptop, or simply from machines not hosted in Amazon. In both cases, the external IP of the instance needs to be used (or hostname, if DNS is configured).
202-
203-
![images/aws02.png](/content/images/2018/08/aws02.png)
204-
205-
Here's an example configuration:
206-
207-
listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
208-
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
209-
advertised.listeners=INTERNAL://ip-172-31-18-160.us-west-2.compute.internal:19092,EXTERNAL://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
210-
inter.broker.listener.name=INTERNAL
211-
212-
## Exploring listeners with Docker
213-
214-
Take a look at https://github.com/rmoff/kafka-listeners. This includes a docker-compose to bring up a Zookeeper instance along with Kafka broker configured with several listeners.
215-
216-
* Listener `BOB` (port 29092) for internal traffic on the Docker network
217-
218-
$ docker run -t --network kafka-listeners_default \
219-
confluentinc/cp-kafkacat \
220-
kafkacat -b kafka0:29092 \
221-
-L
222-
Metadata for all topics (from broker 0: kafka0:29092/0):
223-
1 brokers:
224-
broker 0 at kafka0:29092
225-
226-
* Listener `FRED` (port 9092) for traffic from the Docker-host machine (`localhost`)
227-
228-
$ docker run -t --network kafka-listeners_default \
229-
confluentinc/cp-kafkacat \
230-
kafkacat -b kafka0:9092 \
231-
-L
232-
Metadata for all topics (from broker -1: kafka0:9092/bootstrap):
233-
1 brokers:
234-
broker 0 at localhost:9092
235-
236-
* Listener `ALICE` (port 29094) for traffic from outside, reaching the Docker host on the DNS name `never-gonna-give-you-up`
237-
238-
$ docker run -t --network kafka-listeners_default \
239-
confluentinc/cp-kafkacat \
240-
kafkacat -b kafka0:29094 \
241-
-L
242-
Metadata for all topics (from broker -1: kafka0:29094/bootstrap):
243-
1 brokers:
244-
broker 0 at never-gonna-give-you-up:29094
245-
246-
## References
247-
248-
* https://kafka.apache.org/documentation/#brokerconfigs
249-
* https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic
250-
* https://cwiki.apache.org/confluence/display/KAFKA/KIP-2+-+Refactor+brokers+to+allow+listening+on+multiple+ports+and+IPs
251-
* https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers
252-
* https://stackoverflow.com/questions/42998859/kafka-server-configuration-listeners-vs-advertised-listeners
253-
254-
## Still not sure?
255-
256-
Check out the [Community Slack group](http://cnfl.io/slack) or [Apache Kafka user mailing list](https://lists.apache.org/[email protected]).
3+
See https://rmoff.net/2018/08/02/kafka-listeners-explained/

docker-compose.yml

+7-29
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
version: '2'
33
services:
44
zookeeper:
5-
image: "confluentinc/cp-zookeeper:5.0.0-rc3"
5+
image: "confluentinc/cp-zookeeper:5.2.1"
66
environment:
77
ZOOKEEPER_CLIENT_PORT: 2181
88
ZOOKEEPER_TICK_TIME: 2000
@@ -13,7 +13,7 @@ services:
1313
# ALICE for traffic from outside, reaching the Docker host on the DNS name `never-gonna-give-you-up`
1414
# Use
1515
kafka0:
16-
image: "confluentinc/cp-enterprise-kafka:5.0.0-rc3"
16+
image: "confluentinc/cp-enterprise-kafka:5.2.1"
1717
ports:
1818
- '9092:9092'
1919
- '29094:29094'
@@ -22,36 +22,14 @@ services:
2222
environment:
2323
KAFKA_BROKER_ID: 0
2424
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
25+
KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://kafka0:9092,LISTENER_ALICE://kafka0:29094
2526
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092,LISTENER_ALICE://never-gonna-give-you-up:29094
2627
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT,LISTENER_ALICE:PLAINTEXT
2728
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
28-
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
29-
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
29+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
3030
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
3131
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
32-
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka0:29092
33-
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
34-
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
35-
CONFLUENT_METRICS_ENABLE: 'true'
36-
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
3732

38-
# kafka1:
39-
# image: "confluentinc/cp-enterprise-kafka:5.0.0-rc3"
40-
# ports:
41-
# - '19093:19093'
42-
# depends_on:
43-
# - zookeeper
44-
# environment:
45-
# KAFKA_BROKER_ID: 1
46-
# KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
47-
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
48-
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29093,PLAINTEXT_HOST://localhost:19093
49-
# KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
50-
# KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
51-
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
52-
# KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
53-
# CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka1:29093
54-
# CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
55-
# CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
56-
# CONFLUENT_METRICS_ENABLE: 'true'
57-
# CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
33+
kafkacat:
34+
image: confluentinc/cp-kafkacat
35+
command: sleep infinity

0 commit comments

Comments
 (0)