Skip to content

Commit 1188320

Browse files
committed
Examples: Add the Kafka example for the BulkIndexer helper
Related: elastic#137
1 parent aae4e81 commit 1188320

File tree

11 files changed

+901
-0
lines changed

11 files changed

+901
-0
lines changed

_examples/bulk/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,5 @@ indexer.Close(context.Background())
5555
```
5656

5757
Please refer to the [`benchmarks`](benchmarks) folder for performance tests with different types of payload.
58+
59+
See the [`kafka`](kafka) folder for an end-to-end example of using the bulk helper for indexing data from a Kafka topic.

_examples/bulk/kafka/.env

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
COMPOSE_PROJECT_NAME=kafka
2+
3+
CONFLUENT_VERSION=5.4.0
4+
ELASTIC_VERSION=8.0.0-SNAPSHOT
5+
6+
KAFKA_MEMORY=1G
7+
ES_MEMORY=1G

_examples/bulk/kafka/Makefile

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
SHELL := /bin/bash
2+
3+
test:
4+
go build -o /dev/null kafka.go
5+
6+
setup: stack status wait_for_kibana load_kibana_dashboard
7+
8+
clean:
9+
@docker-compose down --volumes
10+
11+
run:
12+
go run kafka.go
13+
14+
stack:
15+
@docker-compose up --detach
16+
17+
status:
18+
@{ \
19+
cols=`tput cols`; max=98; i=1; \
20+
while [[ "$$i" -lt "$$cols" && "$$i" -lt "$$max" ]]; do printf '-'; let i=$$i+1; done; printf "\n"; \
21+
docker-compose ps; \
22+
i=1; while [[ "$$i" -lt "$$cols" && "$$i" -lt "$$max" ]]; do printf '-'; let i=$$i+1; done; printf "\n"; \
23+
}
24+
25+
wait_for_kibana:
26+
@{ \
27+
printf "Waiting for Kibana..."; \
28+
until docker inspect kibana > /dev/null 2>&1 && [[ `docker inspect -f '{{ .State.Health.Status }}' kibana` == "healthy" ]]; do printf '.'; sleep 5; done; \
29+
printf "\nOpen dashboard at <http://localhost:5601/app/kibana#/dashboard/140b5490-5fce-11ea-a238-bf5970186390>\n"; \
30+
}
31+
32+
load_kibana_dashboard:
33+
@{ \
34+
curl -s -S -X POST -H 'kbn-xsrf: true' 'http://localhost:5601/api/saved_objects/_import?overwrite=true' --form file=@etc/kibana-objects.ndjson > /dev/null; \
35+
}
36+
37+
save_kibana_dashboard:
38+
@{ \
39+
curl -s -S -X POST -H 'Content-Type: application/json' -H 'kbn-xsrf: true' -o './etc/kibana-objects.ndjson' 'http://localhost:5601/api/saved_objects/_export' -d ' \
40+
{ \
41+
"objects": [ \
42+
{ "type": "index-pattern", "id": "stocks-index-pattern" }, \
43+
{ "type": "dashboard", "id": "140b5490-5fce-11ea-a238-bf5970186390" } \
44+
], \
45+
"includeReferencesDeep": true \
46+
}'; \
47+
}
48+
49+
.PHONY: test setup run clean stack status wait_for_kibana load_kibana_dashboard save_kibana_dashboard

_examples/bulk/kafka/README.md

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Example: Bulk indexing from a Kafka topic
2+
3+
This example demonstrates using the `BulkIndexer` component to ingest data consumed from a Kafka topic.
4+
5+
The provided `docker-compose.yml` file launches a realistic environment with Zookeeper, Kafka, Confluent Control Center, Elasticsearch and Kibana, and allows to inspect data flows, indexer metrics, and see the ingested data in a dashboard.
6+
7+
![Screenshot](screenshot.png)
8+
9+
First, launch the environment and wait until it's ready:
10+
11+
make setup
12+
13+
Then, launch the Kafka producers and consumers and the Elasticsearch indexer:
14+
15+
make run
16+
17+
Open the [_Kibana_ dashboard](http://localhost:5601/app/kibana#/dashboard/140b5490-5fce-11ea-a238-bf5970186390) to see the results, the [_Kibana_ APM application](http://localhost:5601/app/apm#/services/kafka/transactions?rangeFrom=now-15m&rangeTo=now&refreshPaused=true&refreshInterval=0&transactionType=indexing) to see the indexer metrics, and [_Confluent Control Center_](http://localhost:9021/) to inspect the Kafka cluster and see details about the topic and performance of consumers.
18+
19+
See the [`producer/producer.go`](producer/producer.go) file for the Kafka producer, [`consumer/consumer.go`](consumer/consumer.go) for the Kafka consumer, and the [`kafka.go`](kafka.go) file for the main workflow. The default configuration will launch one producer, four consumers, one indexer, and will send 1,000 messages per second; see `go run kafka.go --help` for changing the defaults.
+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Licensed to Elasticsearch B.V. under one or more agreements.
2+
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
package consumer
6+
7+
import (
8+
"bytes"
9+
"context"
10+
"fmt"
11+
"time"
12+
13+
"github.com/segmentio/kafka-go"
14+
15+
"go.elastic.co/apm"
16+
17+
"github.com/elastic/go-elasticsearch/v8/esutil"
18+
)
19+
20+
type Consumer struct {
21+
BrokerURL string
22+
TopicName string
23+
24+
Indexer esutil.BulkIndexer
25+
reader *kafka.Reader
26+
27+
startTime time.Time
28+
totalMessages int64
29+
totalErrors int64
30+
totalBytes int64
31+
}
32+
33+
func (c *Consumer) Run(ctx context.Context) (err error) {
34+
if c.Indexer == nil {
35+
panic(fmt.Sprintf("%T.Indexer is nil", c))
36+
}
37+
c.startTime = time.Now()
38+
39+
c.reader = kafka.NewReader(kafka.ReaderConfig{
40+
Brokers: []string{c.BrokerURL},
41+
GroupID: "go-elasticsearch-demo",
42+
Topic: c.TopicName,
43+
// MinBytes: 1e+6, // 1MB
44+
// MaxBytes: 5e+6, // 5MB
45+
46+
ReadLagInterval: 1 * time.Second,
47+
})
48+
49+
for {
50+
msg, err := c.reader.ReadMessage(ctx)
51+
if err != nil {
52+
return fmt.Errorf("reader: %s", err)
53+
}
54+
// log.Printf("%v/%v/%v:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
55+
56+
if err := c.Indexer.Add(ctx,
57+
esutil.BulkIndexerItem{
58+
Action: "create",
59+
Body: bytes.NewReader(msg.Value),
60+
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
61+
// log.Printf("Indexed %s/%s", res.Index, res.DocumentID)
62+
},
63+
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
64+
if err != nil {
65+
apm.CaptureError(ctx, err).Send()
66+
} else {
67+
if res.Error.Type != "" {
68+
// log.Printf("%s:%s", res.Error.Type, res.Error.Reason)
69+
// apm.CaptureError(ctx, fmt.Errorf("%s:%s", res.Error.Type, res.Error.Reason)).Send()
70+
} else {
71+
// log.Printf("%s/%s %s (%d)", res.Index, res.DocumentID, res.Result, res.Status)
72+
// apm.CaptureError(ctx, fmt.Errorf("%s/%s %s (%d)", res.Index, res.DocumentID, res.Result, res.Status)).Send()
73+
}
74+
75+
}
76+
},
77+
}); err != nil {
78+
apm.DefaultTracer.NewError(err).Send()
79+
return fmt.Errorf("indexer: %s", err)
80+
}
81+
}
82+
c.reader.Close()
83+
c.Indexer.Close(ctx)
84+
85+
return nil
86+
}
87+
88+
type Stats struct {
89+
Duration time.Duration
90+
TotalLag int64
91+
TotalMessages int64
92+
TotalErrors int64
93+
TotalBytes int64
94+
Throughput float64
95+
}
96+
97+
func (c *Consumer) Stats() Stats {
98+
if c.reader == nil || c.Indexer == nil {
99+
return Stats{}
100+
}
101+
102+
duration := time.Since(c.startTime)
103+
readerStats := c.reader.Stats()
104+
105+
c.totalMessages += readerStats.Messages
106+
c.totalErrors += readerStats.Errors
107+
c.totalBytes += readerStats.Bytes
108+
109+
rate := float64(c.totalMessages) / duration.Seconds()
110+
111+
return Stats{
112+
Duration: duration,
113+
TotalLag: readerStats.Lag,
114+
TotalMessages: c.totalMessages,
115+
TotalErrors: c.totalErrors,
116+
TotalBytes: c.totalBytes,
117+
Throughput: rate,
118+
}
119+
}
+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
version: "3.7"
2+
3+
services:
4+
zookeeper:
5+
container_name: zookeeper
6+
image: confluentinc/cp-zookeeper:${CONFLUENT_VERSION}
7+
networks:
8+
- kafka
9+
environment:
10+
ZOOKEEPER_SERVER_ID: 1
11+
ZOOKEEPER_CLIENT_PORT: 2181
12+
ZOOKEEPER_TICK_TIME: 2000
13+
KAFKA_OPTS: '-Dzookeeper.4lw.commands.whitelist=ruok'
14+
healthcheck:
15+
test: echo "ruok" | nc localhost 2181 | grep "imok"
16+
17+
kafka:
18+
container_name: kafka
19+
image: confluentinc/cp-server:${CONFLUENT_VERSION}
20+
depends_on:
21+
- zookeeper
22+
ports:
23+
# NOTE: Use kafka:29092 for connections within Docker
24+
- 9092:9092
25+
networks:
26+
- kafka
27+
volumes:
28+
- kafka-data:/var/lib/kafka/data
29+
environment:
30+
KAFKA_BROKER_ID: 1
31+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
32+
KAFKA_HEAP_OPTS: '-Xms${KAFKA_MEMORY} -Xmx${KAFKA_MEMORY}'
33+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
34+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
35+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
36+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
37+
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
38+
KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
39+
KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
40+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
41+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
42+
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
43+
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
44+
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
45+
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
46+
CONFLUENT_METRICS_ENABLE: 'true'
47+
restart: on-failure
48+
healthcheck:
49+
test: nc -z localhost 9092
50+
51+
control-center:
52+
container_name: control-center
53+
image: confluentinc/cp-enterprise-control-center:${CONFLUENT_VERSION}
54+
hostname: control-center
55+
depends_on:
56+
- zookeeper
57+
- kafka
58+
ports:
59+
- 9021:9021
60+
networks:
61+
- kafka
62+
environment:
63+
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:29092'
64+
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
65+
CONTROL_CENTER_REPLICATION_FACTOR: 1
66+
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
67+
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
68+
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
69+
PORT: 9021
70+
ulimits: { nofile: { soft: 16384, hard: 16384 } }
71+
healthcheck:
72+
test: curl --head --max-time 120 --retry 120 --retry-delay 1 --show-error --silent http://localhost:9021
73+
74+
elasticsearch:
75+
container_name: elasticsearch
76+
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION}
77+
volumes:
78+
- es-data:/usr/share/elasticsearch/data
79+
networks:
80+
- elasticsearch
81+
ports:
82+
- 9200:9200
83+
environment:
84+
- node.name=elasticsearch
85+
- cluster.name=go-elasticsearch-kafka-demo
86+
- cluster.initial_master_nodes=elasticsearch
87+
- discovery.seed_hosts=elasticsearch
88+
- network.host=elasticsearch,_local_
89+
- network.publish_host=elasticsearch
90+
- bootstrap.memory_lock=true
91+
- ES_JAVA_OPTS=-Xms${ES_MEMORY} -Xmx${ES_MEMORY}
92+
ulimits: { nofile: { soft: 65535, hard: 65535 }, memlock: -1 }
93+
healthcheck:
94+
test: curl --head --max-time 120 --retry 120 --retry-delay 1 --show-error --silent http://localhost:9200
95+
96+
kibana:
97+
container_name: kibana
98+
image: docker.elastic.co/kibana/kibana:${ELASTIC_VERSION}
99+
depends_on: ['elasticsearch']
100+
networks:
101+
- elasticsearch
102+
ports:
103+
- 5601:5601
104+
environment:
105+
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
106+
- KIBANA_LOGGING_QUIET=true
107+
healthcheck:
108+
test: curl --max-time 120 --retry 120 --retry-delay 1 --show-error --silent http://localhost:5601
109+
110+
apm_server:
111+
container_name: apm_server
112+
image: docker.elastic.co/apm/apm-server:${ELASTIC_VERSION}
113+
depends_on: ['elasticsearch', 'kibana']
114+
command: -e --strict.perms=false
115+
networks:
116+
- elasticsearch
117+
ports:
118+
- 8200:8200
119+
restart: on-failure
120+
healthcheck:
121+
test: curl --max-time 120 --retry 120 --retry-delay 1 --show-error --silent http://localhost:8200
122+
123+
networks:
124+
kafka:
125+
elasticsearch:
126+
127+
volumes:
128+
kafka-data:
129+
es-data:

0 commit comments

Comments
 (0)