Skip to content

Commit 5936d0b

Browse files
Matt Howlettpranavrth
Matt Howlett
andauthored
Serialization Stabilization (#1426)
* tidy up json serde (#1340) * tidy up json serde * to_dict doc tweaks * serializer text tweak * produces -> outputs * address review comments * minor tweak * tidy up protobuf serde (#1337) * tidy up protobuf serde * tweak sr explanation text * tweak SerializeError text * changes following review of JSON Schema PR * Fixed failing test cases by changing the function names (#1419) * tidy up avro serde + additional polish (#1413) * tidy up avro serde * SerializingProducer & DeserializingConsumer polish + other tweaks * Deemphasise SerializingProducer and DeserializingConsumer * Removing use of SerializingProducer and DeserializingConsumer * Tweak examples readme * CI error fixes * Additional CI error fixes * Additional CI error fix * Fix examples * fix flake8 error * A bit of README.md polish (#1424) * A bit of README.md polish * additional tweaks * updates based on review feedback Co-authored-by: Pranav Rathi <[email protected]>
1 parent 6f7ffd8 commit 5936d0b

26 files changed

+810
-1146
lines changed

Diff for: README.md

+30-127
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ Confluent's Python Client for Apache Kafka<sup>TM</sup>
33

44
**confluent-kafka-python** provides a high-level Producer, Consumer and AdminClient compatible with all
55
[Apache Kafka<sup>TM<sup>](http://kafka.apache.org/) brokers >= v0.8, [Confluent Cloud](https://www.confluent.io/confluent-cloud/)
6-
and the [Confluent Platform](https://www.confluent.io/product/compare/). The client is:
6+
and [Confluent Platform](https://www.confluent.io/product/compare/). The client is:
77

88
- **Reliable** - It's a wrapper around [librdkafka](https://github.com/edenhill/librdkafka) (provided automatically via binary wheels) which is widely deployed in a diverse set of production scenarios. It's tested using [the same set of system tests](https://github.com/confluentinc/confluent-kafka-python/tree/master/src/confluent_kafka/kafkatest) as the Java client [and more](https://github.com/confluentinc/confluent-kafka-python/tree/master/tests). It's supported by [Confluent](https://confluent.io).
99

@@ -15,23 +15,25 @@ with Apache Kafka at its core. It's high priority for us that client features ke
1515
pace with core Apache Kafka and components of the [Confluent Platform](https://www.confluent.io/product/compare/).
1616

1717

18-
See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html) for more info.
18+
## Usage
1919

2020
For a step-by-step guide on using the client see [Getting Started with Apache Kafka and Python](https://developer.confluent.io/get-started/python/).
2121

22+
Aditional examples can be found in the [examples](examples) directory or the [confluentinc/examples](https://github.com/confluentinc/examples/tree/master/clients/cloud/python) github repo, which include demonstration of:
23+
- Exactly once data processing using the transactional API.
24+
- Integration with asyncio.
25+
- (De)serializing Protobuf, JSON, and Avro data with Confluent Schema Registry integration.
26+
- [Confluent Cloud](https://www.confluent.io/confluent-cloud/) configuration.
2227

23-
Usage
24-
=====
28+
Also refer to the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html).
2529

26-
Below are some examples of typical usage. For more examples, see the [examples](examples) directory or the [confluentinc/examples](https://github.com/confluentinc/examples/tree/master/clients/cloud/python) github repo for a [Confluent Cloud](https://www.confluent.io/confluent-cloud/) example.
30+
Finally, the [tests](tests) are useful as a reference for example usage.
2731

28-
29-
**Producer**
32+
### Basic Producer Example
3033

3134
```python
3235
from confluent_kafka import Producer
3336

34-
3537
p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})
3638

3739
def delivery_report(err, msg):
@@ -46,23 +48,26 @@ for data in some_data_source:
4648
# Trigger any available delivery report callbacks from previous produce() calls
4749
p.poll(0)
4850

49-
# Asynchronously produce a message, the delivery report callback
50-
# will be triggered from poll() above, or flush() below, when the message has
51-
# been successfully delivered or failed permanently.
51+
# Asynchronously produce a message. The delivery report callback will
52+
# be triggered from the call to poll() above, or flush() below, when the
53+
# message has been successfully delivered or failed permanently.
5254
p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)
5355

5456
# Wait for any outstanding messages to be delivered and delivery report
5557
# callbacks to be triggered.
5658
p.flush()
5759
```
5860

61+
For a discussion on the poll based producer API, refer to the
62+
[Integrating Apache Kafka With Python Asyncio Web Applications](https://www.confluent.io/blog/kafka-python-asyncio-integration/)
63+
blog post.
64+
5965

60-
**High-level Consumer**
66+
### Basic Consumer Example
6167

6268
```python
6369
from confluent_kafka import Consumer
6470

65-
6671
c = Consumer({
6772
'bootstrap.servers': 'mybroker',
6873
'group.id': 'mygroup',
@@ -85,101 +90,8 @@ while True:
8590
c.close()
8691
```
8792

88-
**AvroProducer**
89-
90-
```python
91-
from confluent_kafka import avro
92-
from confluent_kafka.avro import AvroProducer
93-
94-
95-
value_schema_str = """
96-
{
97-
"namespace": "my.test",
98-
"name": "value",
99-
"type": "record",
100-
"fields" : [
101-
{
102-
"name" : "name",
103-
"type" : "string"
104-
}
105-
]
106-
}
107-
"""
108-
109-
key_schema_str = """
110-
{
111-
"namespace": "my.test",
112-
"name": "key",
113-
"type": "record",
114-
"fields" : [
115-
{
116-
"name" : "name",
117-
"type" : "string"
118-
}
119-
]
120-
}
121-
"""
122-
123-
value_schema = avro.loads(value_schema_str)
124-
key_schema = avro.loads(key_schema_str)
125-
value = {"name": "Value"}
126-
key = {"name": "Key"}
127-
128-
129-
def delivery_report(err, msg):
130-
""" Called once for each message produced to indicate delivery result.
131-
Triggered by poll() or flush(). """
132-
if err is not None:
133-
print('Message delivery failed: {}'.format(err))
134-
else:
135-
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
136-
137-
138-
avroProducer = AvroProducer({
139-
'bootstrap.servers': 'mybroker,mybroker2',
140-
'on_delivery': delivery_report,
141-
'schema.registry.url': 'http://schema_registry_host:port'
142-
}, default_key_schema=key_schema, default_value_schema=value_schema)
143-
144-
avroProducer.produce(topic='my_topic', value=value, key=key)
145-
avroProducer.flush()
146-
```
147-
148-
**AvroConsumer**
149-
150-
```python
151-
from confluent_kafka.avro import AvroConsumer
152-
from confluent_kafka.avro.serializer import SerializerError
153-
154-
155-
c = AvroConsumer({
156-
'bootstrap.servers': 'mybroker,mybroker2',
157-
'group.id': 'groupid',
158-
'schema.registry.url': 'http://127.0.0.1:8081'})
15993

160-
c.subscribe(['my_topic'])
161-
162-
while True:
163-
try:
164-
msg = c.poll(10)
165-
166-
except SerializerError as e:
167-
print("Message deserialization failed for {}: {}".format(msg, e))
168-
break
169-
170-
if msg is None:
171-
continue
172-
173-
if msg.error():
174-
print("AvroConsumer error: {}".format(msg.error()))
175-
continue
176-
177-
print(msg.value())
178-
179-
c.close()
180-
```
181-
182-
**AdminClient**
94+
### Basic AdminClient Example
18395

18496
Create topics:
18597

@@ -205,15 +117,12 @@ for topic, f in fs.items():
205117
```
206118

207119

208-
209-
Thread Safety
210-
-------------
120+
## Thread Safety
211121

212122
The `Producer`, `Consumer` and `AdminClient` are all thread safe.
213123

214124

215-
Install
216-
=======
125+
## Install
217126

218127
**Install self-contained binary wheels**
219128

@@ -225,17 +134,13 @@ Install
225134
confluent-kafka using the instructions in the
226135
"Install from source" section below.
227136

228-
**Install AvroProducer and AvroConsumer**
229-
230-
$ pip install "confluent-kafka[avro]"
231-
232137
**Install from source**
233138

234139
For source install, see the *Install from source* section in [INSTALL.md](INSTALL.md).
235140

236141

237-
Broker Compatibility
238-
====================
142+
## Broker Compatibility
143+
239144
The Python client (as well as the underlying C library librdkafka) supports
240145
all broker versions &gt;= 0.8.
241146
But due to the nature of the Kafka protocol in broker versions 0.8 and 0.9 it
@@ -257,8 +162,8 @@ More info here:
257162
https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility
258163

259164

260-
SSL certificates
261-
================
165+
## SSL certificates
166+
262167
If you're connecting to a Kafka cluster through SSL you will need to configure
263168
the client with `'security.protocol': 'SSL'` (or `'SASL_SSL'` if SASL
264169
authentication is used).
@@ -277,22 +182,20 @@ Python package. To use certifi, add an `import certifi` line and configure the
277182
client's CA location with `'ssl.ca.location': certifi.where()`.
278183

279184

280-
281-
License
282-
=======
185+
## License
283186

284187
[Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0)
285188

286189
KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use
287190
by confluent-kafka-python. confluent-kafka-python has no affiliation with and is not endorsed by
288191
The Apache Software Foundation.
289192

290-
Developer Notes
291-
===============
193+
194+
## Developer Notes
292195

293196
Instructions on building and testing confluent-kafka-python can be found [here](DEVELOPER.md).
294197

295-
Confluent Cloud
296-
===============
198+
199+
## Confluent Cloud
297200

298201
For a step-by-step guide on using the Python client with Confluent Cloud see [Getting Started with Apache Kafka and Python](https://developer.confluent.io/get-started/python/) on [Confluent Developer](https://developer.confluent.io/).

0 commit comments

Comments
 (0)