Skip to content

Commit 5925e64

Browse files
Readme with Public API v1 (#109)
1 parent f5bce5d commit 5925e64

File tree

1 file changed

+97
-22
lines changed

1 file changed

+97
-22
lines changed

README.md

Lines changed: 97 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
# 🚧WIP🚧: Swift Kafka Client
1+
# Swift Kafka Client
22

3-
Swift Kafka Client is a Swift Package in development that provides a convenient way to communicate with [Apache Kafka](https://kafka.apache.org) servers. The main goal was to create an API that leverages [Swift's new concurrency features](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html). Under the hood, this package uses the [`librdkafka`](https://github.com/confluentinc/librdkafka) C library.
3+
The Swift Kafka Client library provides a convenient way to interact with [Apache Kafka](https://kafka.apache.org) by leveraging [Swift's new concurrency features](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html). This package wraps the native [`librdkafka`](https://github.com/confluentinc/librdkafka) library.
44

55
## Adding Kafka as a Dependency
66

@@ -32,12 +32,11 @@ Both the `KafkaProducer` and the `KafkaConsumer` implement the [`Service`](https
3232
The `send(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `events` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.
3333

3434
```swift
35-
let broker = KafkaConfiguration.Broker(host: "localhost", port: 9092)
36-
var config = KafkaProducerConfiguration()
37-
config.bootstrapBrokerAddresses = [broker]
35+
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
36+
let configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [brokerAddress])
3837

3938
let (producer, events) = try KafkaProducer.makeProducerWithEvents(
40-
config: config,
39+
configuration: configuration,
4140
logger: logger
4241
)
4342

@@ -79,17 +78,17 @@ await withThrowingTaskGroup(of: Void.self) { group in
7978
After initializing the `KafkaConsumer` with a topic-partition pair to read from, messages can be consumed using the `messages` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence).
8079

8180
```swift
82-
let broker = KafkaConfiguration.Broker(host: "localhost", port: 9092)
83-
var config = KafkaConsumerConfiguration(
81+
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
82+
let configuration = KafkaConsumerConfiguration(
8483
consumptionStrategy: .partition(
8584
KafkaPartition(rawValue: 0),
8685
topic: "topic-name"
87-
)
86+
),
87+
bootstrapBrokerAddresses: [brokerAddress]
8888
)
89-
config.bootstrapBrokerAddresses = [broker]
9089

9190
let consumer = try KafkaConsumer(
92-
config: config,
91+
configuration: configuration,
9392
logger: logger
9493
)
9594

@@ -119,14 +118,14 @@ await withThrowingTaskGroup(of: Void.self) { group in
119118
Kafka also allows users to subscribe to an array of topics as part of a consumer group.
120119

121120
```swift
122-
let broker = KafkaConfiguration.Broker(host: "localhost", port: 9092)
123-
var config = KafkaConsumerConfiguration(
124-
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"])
121+
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
122+
let configuration = KafkaConsumerConfiguration(
123+
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
124+
bootstrapBrokerAddresses: [brokerAddress]
125125
)
126-
config.bootstrapBrokerAddresses = [broker]
127126

128127
let consumer = try KafkaConsumer(
129-
config: config,
128+
configuration: configuration,
130129
logger: logger
131130
)
132131

@@ -156,15 +155,15 @@ await withThrowingTaskGroup(of: Void.self) { group in
156155
By default, the `KafkaConsumer` automatically commits message offsets after receiving the corresponding message. However, we allow users to disable this setting and commit message offsets manually.
157156

158157
```swift
159-
let broker = KafkaConfiguration.Broker(host: "localhost", port: 9092)
160-
var config = KafkaConsumerConfiguration(
161-
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"])
158+
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
159+
var configuration = KafkaConsumerConfiguration(
160+
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
161+
bootstrapBrokerAddresses: [brokerAddress]
162162
)
163-
config.enableAutoCommit = false,
164-
config.bootstrapBrokerAddresses = [broker]
163+
configuration.isAutoCommitEnabled = false
165164

166165
let consumer = try KafkaConsumer(
167-
config: config,
166+
configuration: configuration,
168167
logger: logger
169168
)
170169

@@ -191,6 +190,82 @@ await withThrowingTaskGroup(of: Void.self) { group in
191190
}
192191
```
193192

193+
### Security Mechanisms
194+
195+
Both the `KafkaProducer` and the `KafkaConsumer` can be configured to use different security mechanisms.
196+
197+
#### Plaintext
198+
199+
```swift
200+
var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
201+
configuration.securityProtocol = .plaintext
202+
```
203+
204+
#### TLS
205+
206+
```swift
207+
let leafCert = KafkaConfiguration.TLSConfiguration.LeafAndIntermediates.pem("YOUR_LEAF_CERTIFICATE")
208+
let rootCert = KafkaConfiguration.TLSConfiguration.Root.pem("YOUR_ROOT_CERTIFICATE")
209+
210+
let privateKey = KafkaConfiguration.TLSConfiguration.PrivateKey(
211+
location: .file(location: "KEY_FILE"),
212+
password: ""
213+
)
214+
215+
let tlsConfig = KafkaConfiguration.TLSConfiguration.keyPair(
216+
privateKey: privateKey,
217+
publicKeyCertificate: leafCert,
218+
caCertificate: rootCert,
219+
crlLocation: nil
220+
)
221+
222+
var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
223+
configuration.securityProtocol = .tls(configuration: tlsConfig)
224+
```
225+
226+
#### SASL
227+
228+
```swift
229+
let kerberosConfiguration = KafkaConfiguration.SASLMechanism.KerberosConfiguration(
230+
keytab: "KEYTAB_FILE"
231+
)
232+
233+
var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
234+
config.securityProtocol = .saslPlaintext(
235+
mechanism: .gssapi(kerberosConfiguration: kerberosConfiguration)
236+
)
237+
```
238+
239+
#### SASL + TLS
240+
241+
```swift
242+
let leafCert = KafkaConfiguration.TLSConfiguration.LeafAndIntermediates.pem("YOUR_LEAF_CERTIFICATE")
243+
let rootCert = KafkaConfiguration.TLSConfiguration.Root.pem("YOUR_ROOT_CERTIFICATE")
244+
245+
let privateKey = KafkaConfiguration.TLSConfiguration.PrivateKey(
246+
location: .file(location: "KEY_FILE"),
247+
password: ""
248+
)
249+
250+
let tlsConfig = KafkaConfiguration.TLSConfiguration.keyPair(
251+
privateKey: privateKey,
252+
publicKeyCertificate: leafCert,
253+
caCertificate: rootCert,
254+
crlLocation: nil
255+
)
256+
257+
let saslMechanism = KafkaConfiguration.SASLMechanism.scramSHA256(
258+
username: "USERNAME",
259+
password: "PASSWORD"
260+
)
261+
262+
var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
263+
config.securityProtocol = .saslTLS(
264+
saslMechanism: saslMechanism,
265+
tlsConfiguaration: tlsConfig
266+
)
267+
```
268+
194269
## librdkafka
195270

196271
The Package depends on [the librdkafka library](https://github.com/confluentinc/librdkafka), which is included as a git submodule.

0 commit comments

Comments
 (0)