This directory includes examples of Kafka client applications connect to StreamNative Cloud, showcasing producers, consumers, transactions, and other admin API usage, written using Confluent Golang Client for Apache Kafka.
You can follow Kafka Go Client Guide to get started with the Kafka Go client to produce and consume messages to StreamNative Cloud.
-
Get the bootstrap servers and schema registry URL. Note down the bootstrap servers and schema registry URL as you'll need them in the next steps.
-
Create a service account and get the API key. For simplicity, you can create a super-user service account to run all the examples without worrying about authorization settings.
-
Clone the repo.
git clone https://github.com/streamnative/examples.git
-
Enter the
examples/cloud_kafka/confluent-kafka-go
directory.cd examples/cloud_kafka/confluent-kafka-go
-
Build the examples.
sh build.sh
In the terminal you are running the examples from, export the following environment variables with the values for your StreamNative Cloud cluster.
export BOOTSTRAP_SERVERS="<your-bootstrap-servers>"
export SCHEMA_REGISTRY="<your-schema-registry-url>"
export API_KEY="<your-api-key>"
-
Create a topic.
./admin_create_topic/admin_create_topic $BOOTSTRAP_SERVERS $API_KEY basic_users 4 3
The example will create a topic named
basic_users
with 4 partitions and 3 replicas. See source code admin_create_topic/admin_create_topic.go for more details. -
Describe the topic.
./admin_describe_topics/admin_describe_topics $BOOTSTRAP_SERVERS $API_KEY false basic_users
You should see the topic
basic_users
with 4 partitions. See source code admin_describe_topics/admin_describe_topics.go for more details. -
Produce messages to the topic.
./producer_example/producer_example $BOOTSTRAP_SERVERS $API_KEY basic_users
The examples will produce 3 messages to the topic. You will see the similar output to the following:
Created Producer rdkafka#producer-1 Delivered message to topic basic_users [3] at offset 0 Delivered message to topic basic_users [0] at offset 0 Delivered message to topic basic_users [2] at offset 0
See source code producer_example/producer_example.go for more details.
-
Consume messages from the topic.
./consumer_example/consumer_example $BOOTSTRAP_SERVERS $API_KEY sub basic_users
This example will consume messages from the topic with a subscription name
sub
. You should see the similar output to the following:Created Consumer rdkafka#consumer-1 % Message on basic_users[0]@0: Producer example, message #1 % Headers: [myTestHeader="header values are binary"] % Message on basic_users[3]@0: Producer example, message #0 % Headers: [myTestHeader="header values are binary"] % Message on basic_users[2]@0: Producer example, message #2 % Headers: [myTestHeader="header values are binary"] Ignored OffsetsCommitted (<nil>, [basic_users[0]@1 basic_users[1]@unset basic_users[2]@1 basic_users[3]@1])
Once you are done with the consumer, enter
Ctrl-C
to terminate the consumer application.See source code consumer_example/consumer_example.go for more details.
-
Get the EARLIEST and LATEST offsets for the topic.
You can get earliest offset of a topic partition by running the following command:
./admin_list_offsets/admin_list_offsets $BOOTSTRAP_SERVERS $API_KEY basic_users 1 EARLIEST
You should see the output like the following:
Topic: basic_users Partition: 1 Offset: 0 Timestamp: 0
You can get latest offset of a topic partition by running the following command:
./admin_list_offsets/admin_list_offsets $BOOTSTRAP_SERVERS $API_KEY basic_users 1 LATEST
You should see the output like the following:
Topic: basic_users Partition: 1 Offset: 0 Timestamp: 0
See source code admin_list_offsets/admin_list_offsets.go for more details.
-
List the consumer groups.
./admin_list_consumer_groups/admin_list_consumer_groups -b $BOOTSTRAP_SERVERS -api-key $API_KEY
You should see the output like the following:
A total of 1 consumer group(s) listed: GroupId: sub State: Empty Type: Unknown IsSimpleConsumerGroup: false
See source code admin_list_consumer_groups/admin_list_consumer_groups.go for more details.
-
List the consumer group offsets.
./admin_list_consumer_group_offsets/admin_list_consumer_group_offsets $BOOTSTRAP_SERVERS $API_KEY sub false basic_users 1
See source code admin_list_consumer_group_offsets/admin_list_consumer_group_offsets.go for more details.
-
Describe the consumer group.
./admin_describe_consumer_groups/admin_describe_consumer_groups $BOOTSTRAP_SERVERS $API_KEY false sub
This example will describe the consumer group
sub
. See source code admin_describe_consumer_groups/admin_describe_consumer_groups.go for more details. -
Delete the consumer group.
./admin_delete_consumer_groups/admin_delete_consumer_groups $BOOTSTRAP_SERVERS $API_KEY 60 sub
This example will delete the consumer group
sub
with 60 seconds timeout. See source code admin_delete_consumer_groups/admin_delete_consumer_groups.go for more details.After the consumer group is deleted, the consumer group
sub
will no longer be listed. You can verify this by running theadmin_list_consumer_groups
example again../admin_list_consumer_groups/admin_list_consumer_groups -b $BOOTSTRAP_SERVERS -api-key $API_KEY
-
Delete the records from the topic.
./admin_delete_records/admin_delete_records $BOOTSTRAP_SERVERS $API_KEY basic_users 0 1
The example will delete the records from the topic
basic_users
partition 0 offset 1.You should see the output like the following:
Delete records result for topic basic_users partition: 0 Delete records succeeded New low-watermark: 0
See source code admin_delete_records/admin_delete_records.go for more details.
-
Delete the topic.
./admin_delete_topics/admin_delete_topics $BOOTSTRAP_SERVERS $API_KEY basic_users
This example will delete the topic
basic_users
. See source code admin_delete_topics/admin_delete_topics.go for more details.After the topic is deleted, the topic
basic_users
will no longer be listed. You can verify this by running theadmin_describe_topics
example again../admin_describe_topics/admin_describe_topics $BOOTSTRAP_SERVERS $API_KEY false basic_users
You will see the following output:
A total of 1 topic(s) described: Topic: basic_users has error: Broker: Unknown topic or partition
-
Produce generic AVRO messages.
./avro_generic_producer_example/avro_generic_producer_example $BOOTSTRAP_SERVERS $SCHEMA_REGISTRY $API_KEY generic_avro_users
This example produce 1 message to the topic
generic_avro_users
. See source code avro_generic_producer_example/avro_generic_producer_example.go for more details.You should see the following output:
Created Producer rdkafka#produce-1 Delivered message to topic generic_avro_users [0] at offset 0
-
Consume generic AVRO messages.
./avro_generic_consumer_example/avro_generic_consumer_example $BOOTSTRAP_SERVERS $SCHEMA_REGISTRY $API_KEY sub generic_avro_users
This example will consume messages from the topic
generic_avro_users
with a subscription namesub
. See source code avro_generic_consumer_example/avro_generic_consumer_example.go for more details.You should see the following output:
Created Consumer rdkafka#consumer-1 % Message on generic_avro_users[0]@0: {Name:First user FavoriteNumber:42 FavoriteColor:blue} % Headers: [myTestHeader="header values are binary"]
Once you are done with the consumer, enter
Ctrl-C
to terminate the consumer application.
-
Produce specific AVRO messages.
./avro_specific_producer_example/avro_specific_producer_example $BOOTSTRAP_SERVERS $SCHEMA_REGISTRY $API_KEY specific_avro_users
This example produce 1 message to the topic
specific_avro_users
. See source code avro_specific_producer_example/avro_specific_producer_example.go for more details.You should see the following output:
Created Producer rdkafka#produce-1 Delivered message to topic specific_avro_users [0] at offset 0
-
Consume specific AVRO messages.
./avro_specific_consumer_example/avro_specific_consumer_example $BOOTSTRAP_SERVERS $SCHEMA_REGISTRY $API_KEY sub specific_avro_users
This example will consume messages from the topic
specific_avro_users
with a subscription namesub
. See source code avro_specific_consumer_example/avro_specific_consumer_example.go for more details.You should see the following output:
Created Consumer rdkafka#consumer-1 % Message on specific_avro_users[0]@0: {Name:First user Favorite_number:42 Favorite_color:blue} % Headers: [myTestHeader="header values are binary"] Ignored OffsetsCommitted (<nil>, [specific_avro_users[0]@1])
Once you are done with the consumer, enter
Ctrl-C
to terminate the consumer application.
-
Produce Avro v2 messages.
./avrov2_producer_example/avrov2_producer_example $BOOTSTRAP_SERVERS $SCHEMA_REGISTRY $API_KEY avrov2_users
This example produce 1 message to the topic
avrov2_users
. See source code avrov2_producer_example/avrov2_producer_example.go for more details.You should see the following output:
Created Producer rdkafka#produce-1 Delivered message to topic avrov2_users [0] at offset 0
-
Consume Avro v2 messages.
./avrov2_consumer_example/avrov2_consumer_example $BOOTSTRAP_SERVERS $SCHEMA_REGISTRY $API_KEY sub avrov2_users
This example will consume messages from the topic
avrov2_users
with a subscription namesub
. See source code avrov2_consumer_example/avrov2_consumer_example.go for more details.You should see the following output:
Created Consumer rdkafka#consumer-1 % Message on avrov2_users[0]@0: {Name:First user FavoriteNumber:42 FavoriteColor:blue} % Headers: [myTestHeader="header values are binary"] Ignored OffsetsCommitted (<nil>, [avrov2_users[0]@1])
Once you are done with the consumer, enter
Ctrl-C
to terminate the consumer application.
-
Produce JSON messages.
./json_producer_example/json_producer_example $BOOTSTRAP_SERVERS $SCHEMA_REGISTRY $API_KEY json_users
This example produce 1 message to the topic
json_users
. See source code json_producer_example/json_producer_example.go for more details.You should see the following output:
Created Producer rdkafka#produce-1 Delivered message to topic json_users [0] at offset 0
-
Consume JSON messages.
./json_consumer_example/json_consumer_example $BOOTSTRAP_SERVERS $SCHEMA_REGISTRY $API_KEY sub json_users
This example will consume messages from the topic
json_users
with a subscription namesub
. See source code json_consumer_example/json_consumer_example.go for more details.You should see the following output:
Created Consumer rdkafka#consumer-1 % Message on json_users[0]@0: {Name:First user FavoriteNumber:42 FavoriteColor:blue} % Headers: [myTestHeader="header values are binary"] Ignored OffsetsCommitted (<nil>, [json_users[0]@1])
Once you are done with the consumer, enter
Ctrl-C
to terminate the consumer application.
-
Produce Protobuf messages.
./protobuf_producer_example/protobuf_producer_example $BOOTSTRAP_SERVERS $SCHEMA_REGISTRY $API_KEY protobuf_users
This example produce 1 message to the topic
protobuf_users
. See source code protobuf_producer_example/protobuf_producer_example.go for more details.You should see the following output:
Created Producer rdkafka#produce-1 Delivered message to topic protobuf_users [0] at offset 0
-
Consume Protobuf messages.
./protobuf_consumer_example/protobuf_consumer_example $BOOTSTRAP_SERVERS $SCHEMA_REGISTRY $API_KEY sub protobuf_users
This example will consume messages from the topic
protobuf_users
with a subscription namesub
. See source code protobuf_consumer_example/protobuf_consumer_example.go for more details.You should see the following output:
Created Consumer rdkafka#consumer-1 % Message on protobuf_users[0]@0: name:"First user" favorite_number:42 favorite_color:"blue" % Headers: [myTestHeader="header values are binary"] Ignored OffsetsCommitted (<nil>, [protobuf_users[0]@1])
Once you are done with the consumer, enter
Ctrl-C
to terminate the consumer application.
-
Produce messages to the topic.
./producer_example/producer_example $BOOTSTRAP_SERVERS $API_KEY basic_users_metadata
This example will produce 3 messages to the topic
basic_users_metadata
. See source code producer_example/producer_example.go for more details.You should see the following output:
Created Producer rdkafka#produce-1 Delivered message to topic basic_users_metadata [0] at offset 0 Delivered message to topic basic_users_metadata [0] at offset 1 Delivered message to topic basic_users_metadata [0] at offset 2
-
Commit offset with metadata.
./consumer_offset_metadata/consumer_offset_metadata $BOOTSTRAP_SERVERS $API_KEY sub basic_users_metadata 0 2 "my_metadata"
This example will commit the offset 2 with metadata
my_metadata
for the topicbasic_users_metadata
partition 0. See source code consumer_offset_metadata/consumer_offset_metadata.go for more details.You should see the following output:
Created Consumer rdkafka#consumer-1 Committing offset 2 with metadata my_metadata Partition 0 offset committed successfully
You can verify the metadata by running the following command:
./consumer_offset_metadata/consumer_offset_metadata $BOOTSTRAP_SERVERS $API_KEY sub basic_users_metadata 0
You should see the following output:
Created Consumer rdkafka#consumer-1 Committed partition 0 offset: 2 metadata: my_metadata
Please refer to transactions_example for more details.
-
Run the verifiable producer.
./go_verifiable_producer/go_verifiable_producer --topic=verifiable_topic --broker-list=$BOOTSTRAP_SERVERS --api-key=$API_KEY --throughput=1000 --max-messages=10000
This example will produce 10000 messages to the topic
verifiable_topic
with a throughput of 1000 messages per second. -
Run the verifiable consumer.
./go_verifiable_consumer/go_verifiable_consumer --group-id=verifier --topic=verifiable_topic --broker-list=$BOOTSTRAP_SERVERS --api-key=$API_KEY --session-timeout=60000 --report-interval-time=10000 --report-interval-msgs=1000
This example will consume messages from the topic
verifiable_topic
with a consumer groupverifier
and report the consumption progress every 10000 milliseconds and every 1000 messages.