This directory includes examples of Kafka client applications connect to StreamNative Cloud.
Please ensure poetry
is installed. If not, follow the instructions here.
Install the dependencies:
poetry install
Please fill the sncloud.ini
before running the examples. Then you can run poetry run python xxx.py
to execute the xxx.py
script.
The common
section contains:
bootstrap.servers
: The URL to connect to the Kafka protocol endpoint. You can get it from StreamNative cloud console, it's usually in the format ofpc-xxx:9093
.topic
: The topic nametoken
: The token to authenticate with both the Kafka protocol endpoint and the Kafka Schema Registry. You can get it from StreamNative cloud console by creating a new API key.
The consumer
section contains:
group.id
: The consumer group id
The schema.registry
section contains:
url
: The URL to connect to the Kafka Schema Registry. You can get it from StreamNative cloud console, it's usually in the format ofhttps://pc-xxx/kafka
.
producer.py
: It will send 10 messages to the topic.consumer.py
: It will read messages from the topic and exit after receiving an interrupt signal (e.g., pressCtrl+C
).
It will use the following schema:
{
"name": "User",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
]
}
and the User
Python class definition:
class User:
def __init__(self, name: str = None, age: int = 0):
self.name = name
self.age = age
avro_producer.py
: It will send a message (User: {"name": "Alice", "age": 18}
) with Avro schema to the topic.avro_consumer.py
: It will read messages from the topic and exit after receiving an interrupt signal (e.g., pressCtrl+C
). All messages will be parsed to theUser
class, where invalid messages will be skipped.