diff --git a/README.md b/README.md index 4677b07..69be7d6 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ The implementation is specifically designed to handle only the wire protocol. Th ## Supported messages -One of the main features of this library is its ability to generate Elixir code using [Kafka message definitions](https://github.com/apache/kafka/tree/trunk/clients/src/main/resources/common/message). This ensures that all currently available and future messages (excluding the ones mentioned below) should be easily supported in all versions by the implementation. +One of the main features of this library is its ability to generate Elixir code using [Kafka message definitions](https://github.com/apache/kafka/tree/trunk/clients/src/main/resources/common/message). This ensures that all currently available and future messages (excluding the ones mentioned below) should be easily supported in all versions by the implementation. This implementation of the Kafka protocol supports all currently available messages and versions, except for `Fetch` versions below 4. This is due to a change in record batch serialization in newer versions (>=4) of this message. As a result, the library is NOT compatible with Kafka versions prior 0.11. @@ -46,6 +46,7 @@ Inside the `response` variable you will get something like this: headers: %{correlation_id: 123} } ``` + ## Messages API Each message has it's own set of fields, as described in the Kafka official documentation. On this library, each message has it's own module within `KlifeProtocol.Messages` namespace. In the previous example we used `KlifeProtocol.Messages.ApiVersions` which stands for the [API version message](https://kafka.apache.org/protocol.html#The_Messages_ApiVersions). @@ -151,22 +152,40 @@ After, you can use the `deserialize_response/3` function of the messages API, pa {:ok, %{content: content}} = Messages.ApiVersions.deserialize_response(rest_data, version, false) ``` +## SASL + +SASL is handled by the `KlifeProtocol.Socket` module and client libraries can pass SASL options to `connect/3` function. + +For now the only supported mechanism is PLAIN and you can use it like this: + +```elixir +alias KlifeProtocol.Socket +sasl_opts = [ + mechanism: "PLAIN", + sasl_auth_vsn: 2, + sasl_handshake_vsn: 1, + mechanism_opts: [ + username: "klifeusr", + password: "klifepwd" + ] +] + +{:ok, socket} = Socket.connect("localhost", 9092, [backend: :ssl, sasl_opts: sasl_opts]) +``` ## Compression and Record Batch Attributes Currently supports two compression methods: `snappy` using [sanppyer library](https://github.com/zmstone/snappyer) and `gzip` using [erlang zlib library](https://www.erlang.org/doc/man/zlib.html). - To configure the compression strategy, along with other important data such as `timestampType` and `isTransactional`, you can utilize the Kafka record batch's [attributes byte](https://kafka.apache.org/documentation/#recordbatch). - Klife protocol provides an interface to simplify the creation of this attributes using the `encode_attributes/1` and `decode_attributes/1` functions of the `RecordBatch` module. Here's an example: ```elixir alias KlifeProtocol.RecordBatch opts = [ - compression: :snappy, + compression: :snappy, timestamp_type: :log_append_time, is_transactional: true, is_control_batch: true, @@ -185,6 +204,7 @@ attr_val = RecordBatch.encode_attributes(opts) ``` With these functions, a client can easily generate and decode attributes for each record batch it handles. + ## Performance This section provides performance benchmarks for the main use cases of produce serialization and fetch deserialization, conducted using the [benchee tool](https://github.com/bencheeorg/benchee). The benchmarks measure only the serialization work, as the Kafka cluster is running solely to retrieve data samples for benchmarking purposes. Note that network latency is not included in the measurements, only serialization/deserialization performance. @@ -200,28 +220,31 @@ Kernel: 6.1.0-13-amd64 (64-bit) ``` All benchmarks can be executed by running the benchmark mix task from the project's base folder: + ``` bash run-kafka.sh mix benchmark produce_serialization mix benchmark fetch_deserialization bash stop-kafka.sh ``` + ### Produce Serialization -| REC QTY | REC SIZE | REC/S | IPS | AVG | P50 | P99 | SD | Mem. Usg | -|---------|----------|---------|--------|--------|--------|---------|-------|----------| -| 1 | 500 kb | 3.92 k | 3.92 K | 254 μs | 252 μs | 300 μs | ±4.8% | 3 kb | -| 10 | 50 kb | 37.6 k | 3.76 K | 265 μs | 263 μs | 316 μs | ±4.4% | 12 kb | -| 50 | 10 kb | 146.6 k | 2.93 K | 341 μs | 340 μs | 393 μs | ±4.6% | 52 kb | -| 100 | 5 kb | 246.0 k | 2.46 K | 406 μs | 399 μs | 520 μs | ±7.5% | 94 kb | + +| REC QTY | REC SIZE | REC/S | IPS | AVG | P50 | P99 | SD | Mem. Usg | +| ------- | -------- | ------- | ------ | ------ | ------ | ------ | ----- | -------- | +| 1 | 500 kb | 3.92 k | 3.92 K | 254 μs | 252 μs | 300 μs | ±4.8% | 3 kb | +| 10 | 50 kb | 37.6 k | 3.76 K | 265 μs | 263 μs | 316 μs | ±4.4% | 12 kb | +| 50 | 10 kb | 146.6 k | 2.93 K | 341 μs | 340 μs | 393 μs | ±4.6% | 52 kb | +| 100 | 5 kb | 246.0 k | 2.46 K | 406 μs | 399 μs | 520 μs | ±7.5% | 94 kb | ### Fetch Deserialization -| REC QTY | REC SIZE | REC/S | IPS | AVG | P50 | P99 | SD | Mem. Usg | -|---------|----------|--------|--------|--------|--------|---------|------|----------| -| 1 | 500 kb | 4.23 k | 4.23 k | 236 μs | 224 μs | 400 μs | ±14% | 22 kb | -| 10 | 50 kb | 36.2 k | 3.62 k | 276 μs | 257 μs | 421 μs | ±17% | 69 kb | -| 50 | 10 kb | 116 k | 2.32 k | 430 μs | 419 μs | 558 μs | ±13% | 281 kb | -| 100 | 5 kb | 170 k | 1.70 k | 587 μs | 580 μs | 772 μs | ±13% | 545 kb | +| REC QTY | REC SIZE | REC/S | IPS | AVG | P50 | P99 | SD | Mem. Usg | +| ------- | -------- | ------ | ------ | ------ | ------ | ------ | ---- | -------- | +| 1 | 500 kb | 4.23 k | 4.23 k | 236 μs | 224 μs | 400 μs | ±14% | 22 kb | +| 10 | 50 kb | 36.2 k | 3.62 k | 276 μs | 257 μs | 421 μs | ±17% | 69 kb | +| 50 | 10 kb | 116 k | 2.32 k | 430 μs | 419 μs | 558 μs | ±13% | 281 kb | +| 100 | 5 kb | 170 k | 1.70 k | 587 μs | 580 μs | 772 μs | ±13% | 545 kb | ## Project Overview @@ -238,6 +261,7 @@ The project is composed by 5 main components: - Socket: Simple wrapper of `:gen_tcp` and `:ssl` `connect/3` function that set socket opts that are needed to proper communicate with kafka broker `packet: 4` and `binary`. It is intended to be used only for the socket initialization, all other operations must be done using `:gen_tcp` or `:ssl` directly. ![](./assets/overview.png "Project overview") + ## Running Tests ``` @@ -246,7 +270,7 @@ mix test bash stop-kafka.sh ``` -In order to prevent race conditions with kafka initialization is recommended to wait a couple seconds between `bash run-kafka.sh` and `mix test`. +In order to prevent race conditions with kafka initialization is recommended to wait a couple seconds between `bash run-kafka.sh` and `mix test`. If you want to run tests using a SSL connection, you can use an env var `CONN_MODE=SSL` like this: @@ -254,4 +278,4 @@ If you want to run tests using a SSL connection, you can use an env var `CONN_MO bash run-kafka.sh CONN_MODE=SSL mix test bash stop-kafka.sh -``` \ No newline at end of file +```