From b42b3f9efcba1f21819bb843d5d66f0ce7b89a82 Mon Sep 17 00:00:00 2001
From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com>
Date: Mon, 24 Jul 2023 20:14:48 +0300
Subject: [PATCH 1/5] add approx. transactional api

---
 Package.swift                                 |   2 +
 .../KafkaProducerConfiguration.swift          |   5 +
 Sources/SwiftKafka/KafkaConsumer.swift        |  21 +++
 Sources/SwiftKafka/KafkaError.swift           |  48 +++++-
 Sources/SwiftKafka/KafkaProducer.swift        |  54 +++++++
 .../SwiftKafka/RDKafka/RDKafkaClient.swift    | 143 ++++++++++++++++++
 .../RDKafka/RDKafkaTopicPartitionList.swift   |   8 +
 7 files changed, 279 insertions(+), 2 deletions(-)

diff --git a/Package.swift b/Package.swift
index 33d6397c..90aaa0da 100644
--- a/Package.swift
+++ b/Package.swift
@@ -44,6 +44,7 @@ let package = Package(
         .package(url: "https://github.com/apple/swift-nio.git", from: "2.55.0"),
         .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.0.0-alpha.1"),
         .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
+        .package(url: "https://github.com/ordo-one/package-concurrency-helpers", .upToNextMajor(from: "1.0.0")),
         // The zstd Swift package produces warnings that we cannot resolve:
         // https://github.com/facebook/zstd/issues/3328
         .package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"),
@@ -73,6 +74,7 @@ let package = Package(
             name: "SwiftKafka",
             dependencies: [
                 "Crdkafka",
+                .product(name: "ConcurrencyHelpers", package: "package-concurrency-helpers", moduleAliases: ["ConcurrencyHelpers" : "BlockingCallWrapper"]),
                 .product(name: "NIOCore", package: "swift-nio"),
                 .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
                 .product(name: "Logging", package: "swift-log"),
diff --git a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
index 0b444736..5be5e1c9 100644
--- a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
+++ b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
@@ -36,6 +36,8 @@ public struct KafkaProducerConfiguration {
     /// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
     /// Default: `false`
     public var enableIdempotence: Bool = false
+    
+    public var transactionalId: String?
 
     /// Producer queue options.
     public var queue: KafkaConfiguration.QueueOptions = .init()
@@ -108,6 +110,9 @@ extension KafkaProducerConfiguration {
         var resultDict: [String: String] = [:]
 
         resultDict["enable.idempotence"] = String(self.enableIdempotence)
+        if let transactionalId {
+            resultDict["transactional.id"] = transactionalId
+        }
         resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
         resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
         resultDict["queue.buffering.max.ms"] = String(self.queue.bufferingMaxMilliseconds)
diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift
index c8f27dad..ed710108 100644
--- a/Sources/SwiftKafka/KafkaConsumer.swift
+++ b/Sources/SwiftKafka/KafkaConsumer.swift
@@ -301,6 +301,10 @@ public final class KafkaConsumer: Sendable, Service {
             }
         }
     }
+    
+    func client() throws -> RDKafkaClient {
+        return try stateMachine.withLockedValue { try $0.client() }
+    }
 }
 
 // MARK: - KafkaConsumer + StateMachine
@@ -548,5 +552,22 @@ extension KafkaConsumer {
                 return nil
             }
         }
+        
+        func client() throws -> RDKafkaClient {
+            switch self.state {
+            case .uninitialized:
+                fatalError("\(#function) invoked while still in state \(self.state)")
+            case .initializing(let client, _):
+                return client
+            case .consuming(let client, _):
+                return client
+            case .consumptionStopped(let client):
+                return client
+            case .finishing(let client):
+                return client
+            case .finished:
+                throw KafkaError.client(reason: "Client is stopped")
+            }
+        }
     }
 }
diff --git a/Sources/SwiftKafka/KafkaError.swift b/Sources/SwiftKafka/KafkaError.swift
index bfa93c8a..e771be1c 100644
--- a/Sources/SwiftKafka/KafkaError.swift
+++ b/Sources/SwiftKafka/KafkaError.swift
@@ -54,7 +54,7 @@ public struct KafkaError: Error, CustomStringConvertible {
     }
 
     static func rdKafkaError(
-        wrapping error: rd_kafka_resp_err_t, file: String = #fileID, line: UInt = #line
+        wrapping error: rd_kafka_resp_err_t, isFatal: Bool = false, file: String = #fileID, line: UInt = #line
     ) -> KafkaError {
         let errorMessage = String(cString: rd_kafka_err2str(error))
         return KafkaError(
@@ -143,6 +143,36 @@ public struct KafkaError: Error, CustomStringConvertible {
             )
         )
     }
+    
+    static func transactionAborted(
+        reason: String, file: String = #fileID, line: UInt = #line
+    ) -> KafkaError {
+        return KafkaError(
+            backing: .init(
+                code: .transactionAborted, reason: reason, file: file, line: line
+            )
+        )
+    }
+    
+    static func transactionIncomplete(
+        reason: String, file: String = #fileID, line: UInt = #line
+    ) -> KafkaError {
+        return KafkaError(
+            backing: .init(
+                code: .transactionIncomplete, reason: reason, file: file, line: line
+            )
+        )
+    }
+    
+    static func transactionOutOfAttempts(
+        numOfAttempts: UInt64, file: String = #fileID, line: UInt = #line
+    ) -> KafkaError {
+        return KafkaError(
+            backing: .init(
+                code: .transactionOutOfAttempts, reason: "Out of \(numOfAttempts) attempts", file: file, line: line
+            )
+        )
+    }
 }
 
 extension KafkaError {
@@ -162,6 +192,10 @@ extension KafkaError {
             case messageConsumption
             case topicCreation
             case topicDeletion
+            case transactionAborted
+            case transactionIncomplete
+            case notInTransaction // FIXME: maybe add subcode ?
+            case transactionOutOfAttempts
         }
 
         fileprivate var backingCode: BackingCode
@@ -188,6 +222,12 @@ extension KafkaError {
         public static let topicCreation = ErrorCode(.topicCreation)
         /// Deleting a topic failed.
         public static let topicDeletion = ErrorCode(.topicDeletion)
+        /// Transaction was aborted (can be re-tried from scratch).
+        public static let transactionAborted = ErrorCode(.transactionAborted)
+        /// Transaction could not be completed
+        public static let transactionIncomplete = ErrorCode(.transactionIncomplete)
+        /// Out of provided number of attempts
+        public static let transactionOutOfAttempts = ErrorCode(.transactionOutOfAttempts)
 
         public var description: String {
             return String(describing: self.backingCode)
@@ -206,17 +246,21 @@ extension KafkaError {
         let file: String
 
         let line: UInt
+        
+        let isFatal: Bool
 
         fileprivate init(
             code: KafkaError.ErrorCode,
             reason: String,
             file: String,
-            line: UInt
+            line: UInt,
+            isFatal: Bool = false
         ) {
             self.code = code
             self.reason = reason
             self.file = file
             self.line = line
+            self.isFatal = isFatal
         }
 
         // Only the error code matters for equality.
diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift
index b3ed9462..2bb4add3 100644
--- a/Sources/SwiftKafka/KafkaProducer.swift
+++ b/Sources/SwiftKafka/KafkaProducer.swift
@@ -265,6 +265,43 @@ public final class KafkaProducer: Service, Sendable {
             return KafkaProducerMessageID(rawValue: newMessageID)
         }
     }
+    
+    func initTransactions(timeout: Duration = .seconds(5)) async throws {
+        guard config.transactionalId != nil else {
+            throw KafkaError.config(
+                reason: "Could not initialize transactions because transactionalId is not set in config")
+        }
+        // FIXME: maybe add state 'startedWithTransactions'?
+        let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
+        try await client.initTransactions(timeout: timeout)
+    }
+    
+    func beginTransaction() throws {
+        let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
+        try client.beginTransaction()
+    }
+    
+    func send(
+        offsets: RDKafkaTopicPartitionList,
+        forConsumer consumer: KafkaConsumer,
+        timeout: Duration = .kafkaUntilEndOfTransactionTimeout,
+        attempts: UInt64 = .max
+    ) async throws {
+        let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
+        let consumerClient = try consumer.client()
+        try await consumerClient.withKafkaHandlePointer {
+            try await client.send(attempts: attempts, offsets: offsets, forConsumerKafkaHandle: $0, timeout: timeout)
+        }
+    }
+    
+    func abortTransaction(
+        timeout: Duration = .kafkaUntilEndOfTransactionTimeout,
+        attempts: UInt64) async throws {
+        let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
+            try await client.abortTransaction(attempts: attempts, timeout: timeout)
+    }
+    
+    
 }
 
 // MARK: - KafkaProducer + StateMachine
@@ -455,5 +492,22 @@ extension KafkaProducer {
                 break
             }
         }
+
+        // TODO:
+        // 1. add client()
+        // 2. initTransactions() -> change state to startedWithTransactions
+        // 3. transactionsClient() -> return client only for startedWithTransactions
+        
+        
+        func transactionsClient() throws -> RDKafkaClient {
+            switch self.state {
+            case .uninitialized:
+                fatalError("\(#function) invoked while still in state \(self.state)")
+            case .started(let client, _, _, _):
+                return client
+            default:
+                throw KafkaError.connectionClosed(reason: "Producer is stopping or finished")
+            }
+        }
     }
 }
diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift b/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
index 38473484..b52b985f 100644
--- a/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
+++ b/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
@@ -13,6 +13,7 @@
 //===----------------------------------------------------------------------===//
 
 import Crdkafka
+import BlockingCallWrapper
 import Dispatch
 import Logging
 
@@ -436,6 +437,14 @@ final class RDKafkaClient: Sendable {
     func withKafkaHandlePointer<T>(_ body: (OpaquePointer) throws -> T) rethrows -> T {
         return try body(self.kafkaHandle)
     }
+    
+    /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle with async closure.
+    /// - Warning: Do not escape the pointer from the closure for later use.
+    /// - Parameter body: The closure will use the Kafka handle pointer.
+    @discardableResult
+    func withKafkaHandlePointer<T>(_ body: (OpaquePointer) async throws -> T) async rethrows -> T {
+        return try await body(self.kafkaHandle)
+    }
 
     /// Convert an unsafe`rd_kafka_message_t` object to a safe ``KafkaAcknowledgementResult``.
     /// - Parameter messagePointer: An `UnsafePointer` pointing to the `rd_kafka_message_t` object in memory.
@@ -462,4 +471,138 @@ final class RDKafkaClient: Sendable {
 
         return messageResult
     }
+    
+    func initTransactions(timeout: Duration) async throws {
+        let result = await forBlockingFunc {
+            rd_kafka_init_transactions(self.kafkaHandle, Int32(timeout.totalMilliseconds))
+        }
+        
+        if result != nil {
+            let code = rd_kafka_error_code(result)
+            rd_kafka_error_destroy(result)
+            throw KafkaError.rdKafkaError(wrapping: code)
+        }
+    }
+    
+    func beginTransaction() throws {
+        let result = rd_kafka_begin_transaction(kafkaHandle)
+        if result != nil {
+            let code = rd_kafka_error_code(result)
+            rd_kafka_error_destroy(result)
+            throw KafkaError.rdKafkaError(wrapping: code)
+        }
+    }
+    
+    func send(
+        attempts: UInt64,
+        offsets: RDKafkaTopicPartitionList,
+        forConsumerKafkaHandle consumer: OpaquePointer,
+        timeout: Duration) async throws {
+            try await offsets.withListPointer { topicPartitionList in
+                
+                let consumerMetadata = rd_kafka_consumer_group_metadata(consumer)
+                defer { rd_kafka_consumer_group_metadata_destroy(consumerMetadata) }
+                
+                // TODO: actually it should be withing some timeout (like transaction timeout or session timeout)
+                for idx in 0..<attempts {
+                    let error = await forBlockingFunc {
+                        rd_kafka_send_offsets_to_transaction(self.kafkaHandle, topicPartitionList,
+                                                             consumerMetadata, timeout.totalMillisecondsOrMinusOne)
+                    }
+                    
+                    /* check if offset commit is completed successfully  */
+                    if error == nil {
+                        return
+                    }
+                    defer { rd_kafka_error_destroy(error) }
+                    
+                    /* check if offset commit is retriable */
+                    if rd_kafka_error_is_retriable(error) == 1 {
+                        continue
+                    }
+                    
+                    /* check if transaction need to be aborted */
+                    if rd_kafka_error_txn_requires_abort(error) == 1 {
+                        do {
+                            try await abortTransaction(attempts: attempts - idx, timeout: timeout)
+                            throw KafkaError.transactionAborted(reason: "Transaction aborted and can be started from scratch")
+                        } catch {
+                            throw KafkaError.transactionIncomplete(
+                                reason: "Could not complete or abort transaction with error \(error)")
+                        }
+                    }
+                    let isFatal = (rd_kafka_error_is_fatal(error) == 1) // fatal when Producer/Consumer must be restarted
+                    throw KafkaError.rdKafkaError(wrapping: rd_kafka_error_code(error), isFatal: isFatal)
+                }
+            }
+            throw KafkaError.transactionOutOfAttempts(numOfAttempts: attempts)
+        }
+    
+    func abortTransaction(attempts: UInt64, timeout: Duration) async throws {
+        for _ in 0..<attempts {
+            let error = await forBlockingFunc {
+                rd_kafka_abort_transaction(self.kafkaHandle, timeout.totalMillisecondsOrMinusOne)
+            }
+            /* check if transaction abort is completed successfully  */
+            if error == nil {
+                return
+            }
+            defer { rd_kafka_error_destroy(error) }
+
+            /* check if transaction abort is retriable */
+            if rd_kafka_error_is_retriable(error) == 1 {
+                continue
+            }
+            let isFatal = (rd_kafka_error_is_fatal(error) == 1) // fatal when Producer/Consumer must be restarted
+            throw KafkaError.rdKafkaError(wrapping: rd_kafka_error_code(error), isFatal: isFatal)
+        }
+        throw KafkaError.transactionOutOfAttempts(numOfAttempts: attempts)
+    }
+    
+    func commitTransaction(attempts: UInt64, timeout: Duration) async throws {
+        for idx in 0..<attempts {
+            let error = await forBlockingFunc {
+                rd_kafka_commit_transaction(self.kafkaHandle, timeout.totalMillisecondsOrMinusOne)
+            }
+            /* check if transaction is completed successfully  */
+            if error == nil {
+                return
+            }
+            /* check if transaction is retriable */
+            if rd_kafka_error_is_retriable(error) == 1 {
+                continue
+            }
+            defer { rd_kafka_error_destroy(error) }
+            
+            /* check if transaction need to be aborted */
+            if rd_kafka_error_txn_requires_abort(error) == 1 {
+                do {
+                    try await abortTransaction(attempts: attempts - idx, timeout: timeout)
+                    throw KafkaError.transactionAborted(reason: "Transaction aborted and can be started from scratch")
+                } catch {
+                    throw KafkaError.transactionIncomplete(
+                        reason: "Could not complete or abort transaction with error \(error)")
+                }
+            }
+            /* check if error is fatal */
+            let isFatal = (rd_kafka_error_is_fatal(error) == 1) // fatal when Producer/Consumer must be restarted
+            throw KafkaError.rdKafkaError(wrapping: rd_kafka_error_code(error), isFatal: isFatal)
+        }
+        throw KafkaError.transactionOutOfAttempts(numOfAttempts: attempts)
+    }
+}
+
+// TODO: tmp, should be in other PRs
+extension Duration {
+    // Internal usage only: librdkafka accepts Int32 as timeouts
+    var totalMilliseconds: Int32 {
+        return Int32(self.components.seconds * 1000 + self.components.attoseconds / 1_000_000_000_000_000)
+    }
+    
+    var totalMillisecondsOrMinusOne: Int32 {
+        return max(totalMilliseconds, -1)
+    }
+    
+    public static var kafkaUntilEndOfTransactionTimeout: Duration = .milliseconds(-1)
+    public static var kafkaNoWaitTransaction: Duration = .zero
 }
diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift b/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift
index b1a5a813..d14532fb 100644
--- a/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift
+++ b/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift
@@ -57,4 +57,12 @@ final class RDKafkaTopicPartitionList {
     func withListPointer<T>(_ body: (UnsafeMutablePointer<rd_kafka_topic_partition_list_t>) throws -> T) rethrows -> T {
         return try body(self._internal)
     }
+    
+    /// Scoped accessor that enables safe access to the pointer of the underlying `rd_kafka_topic_partition_t`.
+    /// - Warning: Do not escape the pointer from the closure for later use.
+    /// - Parameter body: The closure will use the pointer.
+    @discardableResult
+    func withListPointer<T>(_ body: (UnsafeMutablePointer<rd_kafka_topic_partition_list_t>) async throws -> T) async rethrows -> T {
+        return try await body(self._internal)
+    }
 }

From f5bfbe21a0ab683d1ceb2e2378e30dcccbb136d6 Mon Sep 17 00:00:00 2001
From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com>
Date: Tue, 25 Jul 2023 10:58:56 +0300
Subject: [PATCH 2/5] add state for initalized transactions

---
 Sources/SwiftKafka/KafkaProducer.swift        | 56 ++++++++++++++-----
 .../SwiftKafka/RDKafka/RDKafkaClient.swift    |  3 +-
 2 files changed, 43 insertions(+), 16 deletions(-)

diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift
index 2bb4add3..6d412137 100644
--- a/Sources/SwiftKafka/KafkaProducer.swift
+++ b/Sources/SwiftKafka/KafkaProducer.swift
@@ -271,8 +271,7 @@ public final class KafkaProducer: Service, Sendable {
             throw KafkaError.config(
                 reason: "Could not initialize transactions because transactionalId is not set in config")
         }
-        // FIXME: maybe add state 'startedWithTransactions'?
-        let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
+        let client = try self.stateMachine.withLockedValue { try $0.initTransactions() }
         try await client.initTransactions(timeout: timeout)
     }
     
@@ -329,6 +328,18 @@ extension KafkaProducer {
                 source: Producer.Source?,
                 topicHandles: RDKafkaTopicHandles
             )
+            /// The ``KafkaProducer`` has started and is ready to use, transactions were initialized.
+            ///
+            /// - Parameter messageIDCounter:Used to incrementally assign unique IDs to messages.
+            /// - Parameter client: Client used for handling the connection to the Kafka cluster.
+            /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
+            /// - Parameter topicHandles: Class containing all topic names with their respective `rd_kafka_topic_t` pointer.
+            case startedWithTransactions(
+                client: RDKafkaClient,
+                messageIDCounter: UInt,
+                source: Producer.Source?,
+                topicHandles: RDKafkaTopicHandles
+            )
             /// Producer is still running but the acknowledgement asynchronous sequence was terminated.
             /// All incoming acknowledgements will be dropped.
             ///
@@ -396,7 +407,7 @@ extension KafkaProducer {
             switch self.state {
             case .uninitialized:
                 fatalError("\(#function) invoked while still in state \(self.state)")
-            case .started(let client, _, let source, _):
+            case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
                 return .pollAndYield(client: client, source: source)
             case .consumptionStopped(let client):
                 return .pollWithoutYield(client: client)
@@ -439,6 +450,19 @@ extension KafkaProducer {
                     newMessageID: newMessageID,
                     topicHandles: topicHandles
                 )
+            case .startedWithTransactions(let client, let messageIDCounter, let source, let topicHandles):
+                let newMessageID = messageIDCounter + 1
+                self.state = .startedWithTransactions(
+                    client: client,
+                    messageIDCounter: newMessageID,
+                    source: source,
+                    topicHandles: topicHandles
+                )
+                return .send(
+                    client: client,
+                    newMessageID: newMessageID,
+                    topicHandles: topicHandles
+                )
             case .consumptionStopped:
                 throw KafkaError.connectionClosed(reason: "Sequence consuming acknowledgements was abruptly terminated, producer closed")
             case .finishing:
@@ -464,7 +488,7 @@ extension KafkaProducer {
                 fatalError("\(#function) invoked while still in state \(self.state)")
             case .consumptionStopped:
                 fatalError("messageSequenceTerminated() must not be invoked more than once")
-            case .started(let client, _, let source, _):
+            case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
                 self.state = .consumptionStopped(client: client)
                 return .finishSource(source: source)
             case .finishing(let client, let source):
@@ -484,7 +508,7 @@ extension KafkaProducer {
             switch self.state {
             case .uninitialized:
                 fatalError("\(#function) invoked while still in state \(self.state)")
-            case .started(let client, _, let source, _):
+            case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
                 self.state = .finishing(client: client, source: source)
             case .consumptionStopped(let client):
                 self.state = .finishing(client: client, source: nil)
@@ -493,21 +517,25 @@ extension KafkaProducer {
             }
         }
 
-        // TODO:
-        // 1. add client()
-        // 2. initTransactions() -> change state to startedWithTransactions
-        // 3. transactionsClient() -> return client only for startedWithTransactions
-        
-        
-        func transactionsClient() throws -> RDKafkaClient {
+        mutating func initTransactions() throws -> RDKafkaClient {
             switch self.state {
             case .uninitialized:
                 fatalError("\(#function) invoked while still in state \(self.state)")
-            case .started(let client, _, _, _):
+            case .started(let client, let messageIDCounter, let source, let topicHandles):
+                self.state = .startedWithTransactions(client: client, messageIDCounter: messageIDCounter, source: source, topicHandles: topicHandles)
                 return client
-            default:
+            case .startedWithTransactions:
+                throw KafkaError.config(reason: "Transactions were already initialized")
+            case .consumptionStopped, .finishing, .finished:
                 throw KafkaError.connectionClosed(reason: "Producer is stopping or finished")
             }
         }
+        
+        func transactionsClient() throws -> RDKafkaClient {
+            guard case let .startedWithTransactions(client, _, _, _) = self.state else {
+                throw KafkaError.transactionAborted(reason: "Transactions were not initialized or producer is being stopped")
+            }
+            return client
+        }
     }
 }
diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift b/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
index b52b985f..63f9b3b5 100644
--- a/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
+++ b/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
@@ -474,7 +474,7 @@ final class RDKafkaClient: Sendable {
     
     func initTransactions(timeout: Duration) async throws {
         let result = await forBlockingFunc {
-            rd_kafka_init_transactions(self.kafkaHandle, Int32(timeout.totalMilliseconds))
+            rd_kafka_init_transactions(self.kafkaHandle, timeout.totalMilliseconds)
         }
         
         if result != nil {
@@ -592,7 +592,6 @@ final class RDKafkaClient: Sendable {
     }
 }
 
-// TODO: tmp, should be in other PRs
 extension Duration {
     // Internal usage only: librdkafka accepts Int32 as timeouts
     var totalMilliseconds: Int32 {

From 7da49018bc76a4ec0d08cf492ece9dded1c3adaf Mon Sep 17 00:00:00 2001
From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com>
Date: Mon, 31 Jul 2023 16:09:30 +0300
Subject: [PATCH 3/5] address some review comments

---
 Package.swift                                 |   2 -
 .../KafkaProducerConfiguration.swift          |  42 +++++-
 Sources/SwiftKafka/KafkaProducer.swift        |  93 +++----------
 Sources/SwiftKafka/KafkaTransaction.swift     |  43 ++++++
 .../KafkaTransactionalProducer.swift          |  83 ++++++++++++
 .../SwiftKafka/RDKafka/RDKafkaClient.swift    |  26 ++--
 .../RDKafka/RDKafkaTopicConfig.swift          |  21 ++-
 .../RDKafka/RDKafkaTopicPartitionList.swift   |   2 +-
 .../SwiftKafka/Utilities/BlockingCall.swift   |  10 ++
 Tests/IntegrationTests/SwiftKafkaTests.swift  | 125 +++++++++++++++++-
 Tests/IntegrationTests/Utilities.swift        |   2 +-
 11 files changed, 354 insertions(+), 95 deletions(-)
 create mode 100644 Sources/SwiftKafka/KafkaTransaction.swift
 create mode 100644 Sources/SwiftKafka/KafkaTransactionalProducer.swift
 create mode 100644 Sources/SwiftKafka/Utilities/BlockingCall.swift

diff --git a/Package.swift b/Package.swift
index 90aaa0da..33d6397c 100644
--- a/Package.swift
+++ b/Package.swift
@@ -44,7 +44,6 @@ let package = Package(
         .package(url: "https://github.com/apple/swift-nio.git", from: "2.55.0"),
         .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.0.0-alpha.1"),
         .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
-        .package(url: "https://github.com/ordo-one/package-concurrency-helpers", .upToNextMajor(from: "1.0.0")),
         // The zstd Swift package produces warnings that we cannot resolve:
         // https://github.com/facebook/zstd/issues/3328
         .package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"),
@@ -74,7 +73,6 @@ let package = Package(
             name: "SwiftKafka",
             dependencies: [
                 "Crdkafka",
-                .product(name: "ConcurrencyHelpers", package: "package-concurrency-helpers", moduleAliases: ["ConcurrencyHelpers" : "BlockingCallWrapper"]),
                 .product(name: "NIOCore", package: "swift-nio"),
                 .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
                 .product(name: "Logging", package: "swift-log"),
diff --git a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
index 2acf6f44..01f9d367 100644
--- a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
+++ b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
@@ -36,8 +36,6 @@ public struct KafkaProducerConfiguration {
     /// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
     /// Default: `false`
     public var enableIdempotence: Bool = false
-    
-    public var transactionalId: String?
 
     /// Producer queue options.
     public var queue: KafkaConfiguration.QueueOptions = .init()
@@ -99,6 +97,8 @@ public struct KafkaProducerConfiguration {
     /// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
     /// Default: `.plaintext`
     public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext
+    
+    internal var transactionalId: String?
 
     public init() {}
 }
@@ -112,6 +112,9 @@ extension KafkaProducerConfiguration {
         resultDict["enable.idempotence"] = String(self.enableIdempotence)
         if let transactionalId {
             resultDict["transactional.id"] = transactionalId
+            resultDict["transaction.timeout.ms"] = "60000"
+            resultDict["message.timeout.ms"] = "60000"
+            
         }
         resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
         resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
@@ -191,3 +194,38 @@ extension KafkaConfiguration {
         }
     }
 }
+
+// FIXME: should we really duplicate `KafkaProducerConfiguration`
+// FIXME: after public api updated?
+public struct KafkaTransactionalProducerConfiguration {
+    var transactionalId: String
+    var transactionsTimeout: Duration
+    
+    var producerConfiguration: KafkaProducerConfiguration {
+        set {
+            self.producerConfiguration_ = newValue
+        }
+        get {
+            var conf = self.producerConfiguration_
+            conf.transactionalId = self.transactionalId
+            conf.enableIdempotence = true
+            conf.maxInFlightRequestsPerConnection = min(conf.maxInFlightRequestsPerConnection, 5)
+            return conf
+        }
+    }
+    
+    private var producerConfiguration_: KafkaProducerConfiguration = .init()
+    
+    public init(transactionalId: String, transactionsTimeout: Duration = .kafkaUntilEndOfTransactionTimeout, producerConfiguration: KafkaProducerConfiguration = .init()) {
+        self.transactionalId = transactionalId
+        self.transactionsTimeout = transactionsTimeout
+        self.producerConfiguration = producerConfiguration
+    }
+}
+// MARK: - KafkaProducerConfiguration + Hashable
+
+extension KafkaTransactionalProducerConfiguration: Hashable {}
+
+// MARK: - KafkaProducerConfiguration + Sendable
+
+extension KafkaTransactionalProducerConfiguration: Sendable {}
diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift
index c6040f65..6133c871 100644
--- a/Sources/SwiftKafka/KafkaProducer.swift
+++ b/Sources/SwiftKafka/KafkaProducer.swift
@@ -259,42 +259,10 @@ public final class KafkaProducer: Service, Sendable {
             return KafkaProducerMessageID(rawValue: newMessageID)
         }
     }
-    
-    func initTransactions(timeout: Duration = .seconds(5)) async throws {
-        guard config.transactionalId != nil else {
-            throw KafkaError.config(
-                reason: "Could not initialize transactions because transactionalId is not set in config")
-        }
-        let client = try self.stateMachine.withLockedValue { try $0.initTransactions() }
-        try await client.initTransactions(timeout: timeout)
-    }
-    
-    func beginTransaction() throws {
-        let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
-        try client.beginTransaction()
-    }
-    
-    func send(
-        offsets: RDKafkaTopicPartitionList,
-        forConsumer consumer: KafkaConsumer,
-        timeout: Duration = .kafkaUntilEndOfTransactionTimeout,
-        attempts: UInt64 = .max
-    ) async throws {
-        let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
-        let consumerClient = try consumer.client()
-        try await consumerClient.withKafkaHandlePointer {
-            try await client.send(attempts: attempts, offsets: offsets, forConsumerKafkaHandle: $0, timeout: timeout)
-        }
-    }
-    
-    func abortTransaction(
-        timeout: Duration = .kafkaUntilEndOfTransactionTimeout,
-        attempts: UInt64) async throws {
-        let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
-            try await client.abortTransaction(attempts: attempts, timeout: timeout)
+
+    func client() throws -> RDKafkaClient {
+        try self.stateMachine.withLockedValue { try $0.client() }
     }
-    
-    
 }
 
 // MARK: - KafkaProducer + StateMachine
@@ -322,18 +290,6 @@ extension KafkaProducer {
                 source: Producer.Source?,
                 topicHandles: RDKafkaTopicHandles
             )
-            /// The ``KafkaProducer`` has started and is ready to use, transactions were initialized.
-            ///
-            /// - Parameter messageIDCounter:Used to incrementally assign unique IDs to messages.
-            /// - Parameter client: Client used for handling the connection to the Kafka cluster.
-            /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
-            /// - Parameter topicHandles: Class containing all topic names with their respective `rd_kafka_topic_t` pointer.
-            case startedWithTransactions(
-                client: RDKafkaClient,
-                messageIDCounter: UInt,
-                source: Producer.Source?,
-                topicHandles: RDKafkaTopicHandles
-            )
             /// Producer is still running but the event asynchronous sequence was terminated.
             /// All incoming events will be dropped.
             ///
@@ -401,7 +357,7 @@ extension KafkaProducer {
             switch self.state {
             case .uninitialized:
                 fatalError("\(#function) invoked while still in state \(self.state)")
-            case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
+            case .started(let client, _, let source, _):
                 return .pollAndYield(client: client, source: source)
             case .consumptionStopped(let client):
                 return .pollWithoutYield(client: client)
@@ -444,19 +400,6 @@ extension KafkaProducer {
                     newMessageID: newMessageID,
                     topicHandles: topicHandles
                 )
-            case .startedWithTransactions(let client, let messageIDCounter, let source, let topicHandles):
-                let newMessageID = messageIDCounter + 1
-                self.state = .startedWithTransactions(
-                    client: client,
-                    messageIDCounter: newMessageID,
-                    source: source,
-                    topicHandles: topicHandles
-                )
-                return .send(
-                    client: client,
-                    newMessageID: newMessageID,
-                    topicHandles: topicHandles
-                )
             case .consumptionStopped:
                 throw KafkaError.connectionClosed(reason: "Sequence consuming events was abruptly terminated, producer closed")
             case .finishing:
@@ -482,7 +425,7 @@ extension KafkaProducer {
                 fatalError("\(#function) invoked while still in state \(self.state)")
             case .consumptionStopped:
                 fatalError("messageSequenceTerminated() must not be invoked more than once")
-            case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
+            case .started(let client, _, let source, _):
                 self.state = .consumptionStopped(client: client)
                 return .finishSource(source: source)
             case .finishing(let client, let source):
@@ -502,7 +445,7 @@ extension KafkaProducer {
             switch self.state {
             case .uninitialized:
                 fatalError("\(#function) invoked while still in state \(self.state)")
-            case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
+            case .started(let client, _, let source, _):
                 self.state = .finishing(client: client, source: source)
             case .consumptionStopped(let client):
                 self.state = .finishing(client: client, source: nil)
@@ -510,26 +453,20 @@ extension KafkaProducer {
                 break
             }
         }
-
-        mutating func initTransactions() throws -> RDKafkaClient {
+        
+        func client() throws -> RDKafkaClient {
             switch self.state {
             case .uninitialized:
                 fatalError("\(#function) invoked while still in state \(self.state)")
-            case .started(let client, let messageIDCounter, let source, let topicHandles):
-                self.state = .startedWithTransactions(client: client, messageIDCounter: messageIDCounter, source: source, topicHandles: topicHandles)
+            case .started(let client, _, _, _):
                 return client
-            case .startedWithTransactions:
-                throw KafkaError.config(reason: "Transactions were already initialized")
-            case .consumptionStopped, .finishing, .finished:
-                throw KafkaError.connectionClosed(reason: "Producer is stopping or finished")
-            }
-        }
-        
-        func transactionsClient() throws -> RDKafkaClient {
-            guard case let .startedWithTransactions(client, _, _, _) = self.state else {
-                throw KafkaError.transactionAborted(reason: "Transactions were not initialized or producer is being stopped")
+            case .consumptionStopped(let client):
+                return client
+            case .finishing(let client, _):
+                return client
+            case .finished:
+                throw KafkaError.connectionClosed(reason: "Client stopped")
             }
-            return client
         }
     }
 }
diff --git a/Sources/SwiftKafka/KafkaTransaction.swift b/Sources/SwiftKafka/KafkaTransaction.swift
new file mode 100644
index 00000000..cd1969df
--- /dev/null
+++ b/Sources/SwiftKafka/KafkaTransaction.swift
@@ -0,0 +1,43 @@
+
+
+public final class KafkaTransaction {
+    let client: RDKafkaClient
+    let producer: KafkaProducer
+    let config: KafkaTransactionalProducerConfiguration
+    
+    init(client: RDKafkaClient, producer: KafkaProducer, config: KafkaTransactionalProducerConfiguration) throws {
+        self.client = client
+        self.producer = producer
+        self.config = config
+
+        try client.beginTransaction()
+    }
+    
+    deinit {
+    }
+
+    public func send(
+        offsets: RDKafkaTopicPartitionList,
+        forConsumer consumer: KafkaConsumer,
+        timeout: Duration = .kafkaUntilEndOfTransactionTimeout,
+        attempts: UInt64 = .max
+    ) async throws {
+        let consumerClient = try consumer.client()
+        try await consumerClient.withKafkaHandlePointer {
+            try await client.send(attempts: attempts, offsets: offsets, forConsumerKafkaHandle: $0, timeout: timeout)
+        }
+    }
+    
+    @discardableResult
+    public func send<Key, Value>(_ message: KafkaProducerMessage<Key, Value>) throws -> KafkaProducerMessageID {
+        try self.producer.send(message)
+    }
+    
+    func commit() async throws {
+        try await client.commitTransaction(attempts: .max, timeout: .kafkaUntilEndOfTransactionTimeout)
+    }
+    
+    func abort() async throws {
+        try await client.abortTransaction(attempts: .max, timeout: .kafkaUntilEndOfTransactionTimeout)
+    }
+}
diff --git a/Sources/SwiftKafka/KafkaTransactionalProducer.swift b/Sources/SwiftKafka/KafkaTransactionalProducer.swift
new file mode 100644
index 00000000..d35c8f26
--- /dev/null
+++ b/Sources/SwiftKafka/KafkaTransactionalProducer.swift
@@ -0,0 +1,83 @@
+import Logging
+import ServiceLifecycle
+
+public final class KafkaTransactionalProducer: Service, Sendable {
+    let producer: KafkaProducer
+    let config: KafkaTransactionalProducerConfiguration
+    
+    private init(producer: KafkaProducer, config: KafkaTransactionalProducerConfiguration) async throws {
+        self.producer = producer
+        self.config = config
+        let client = try producer.client()
+        try await client.initTransactions(timeout: config.transactionsTimeout)
+    }
+
+    /// Initialize a new ``KafkaTransactionalProducer``.
+    ///
+    /// This creates a producer without listening for events.
+    ///
+    /// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
+    /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
+    /// - Parameter logger: A logger.
+    /// - Returns: The newly created ``KafkaProducer``.
+    /// - Throws: A ``KafkaError`` if initializing the producer failed.
+    public convenience init(
+        config: KafkaTransactionalProducerConfiguration,
+        topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
+        logger: Logger
+    ) async throws {
+        let producer = try KafkaProducer(config: config.producerConfiguration, topicConfig: topicConfig, logger: logger)
+        try await self.init(producer: producer, config: config)
+    }
+    
+    /// Initialize a new ``KafkaTransactionalProducer`` and a ``KafkaProducerEvents`` asynchronous sequence.
+    ///
+    /// Use the asynchronous sequence to consume events.
+    ///
+    /// - Important: When the asynchronous sequence is deinited the producer will be shutdown and disallow sending more messages.
+    /// Additionally, make sure to consume the asynchronous sequence otherwise the events will be buffered in memory indefinitely.
+    ///
+    /// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
+    /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
+    /// - Parameter logger: A logger.
+    /// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaProducerEvents``
+    /// `AsyncSequence` used for receiving message events.
+    /// - Throws: A ``KafkaError`` if initializing the producer failed.
+    public static func makeTransactionalProducerWithEvents(
+        config: KafkaTransactionalProducerConfiguration,
+        topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
+        logger: Logger
+    ) async throws -> (KafkaTransactionalProducer, KafkaProducerEvents) {
+        let (producer, events) = try KafkaProducer.makeProducerWithEvents(config: config.producerConfiguration, topicConfig: topicConfig, logger: logger)
+        
+        let transactionalProducer = try await KafkaTransactionalProducer(producer: producer, config: config)
+        
+        return (transactionalProducer, events)
+    }
+
+    //
+    public func withTransaction(_ body: @Sendable (KafkaTransaction) async throws -> Void) async throws {
+        let transaction = try KafkaTransaction(
+            client: try producer.client(),
+            producer: producer,
+            config: config)
+        
+        do { // need to think here a little bit how to abort transaction
+            try await body(transaction)
+            try await transaction.commit()
+        } catch { // FIXME: maybe catch AbortTransaction?
+            do {
+                try await transaction.abort()
+            } catch {
+                // FIXME: that some inconsistent state
+                // should we emit fatalError(..)
+                // or propagate error as exception with isFatal flag?
+            }
+            throw error
+        }
+    }
+    
+    public func run() async throws {
+        try await producer.run()
+    }
+}
diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift b/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
index dfa9ce5e..655e0926 100644
--- a/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
+++ b/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
@@ -13,7 +13,7 @@
 //===----------------------------------------------------------------------===//
 
 import Crdkafka
-import BlockingCallWrapper
+import Dispatch
 import Dispatch
 import Logging
 
@@ -36,6 +36,12 @@ final class RDKafkaClient: Sendable {
 
     /// `librdkafka`'s main `rd_kafka_queue_t`.
     private let mainQueue: OpaquePointer
+    
+    /// Queue for blocking calls outside of cooperative thread pool
+    private var queue: DispatchQueue {
+        // global concurrent queue
+        .global(qos: .default)
+    }
 
     // Use factory method to initialize
     private init(
@@ -459,7 +465,11 @@ final class RDKafkaClient: Sendable {
     }
     
     func initTransactions(timeout: Duration) async throws {
-        let result = await forBlockingFunc {
+        rd_kafka_conf_set_dr_msg_cb(self.kafkaHandle, {_,_,_ in
+            print("test")
+        })
+        
+        let result = await performBlockingCall(queue: queue) {
             rd_kafka_init_transactions(self.kafkaHandle, timeout.totalMilliseconds)
         }
         
@@ -491,7 +501,7 @@ final class RDKafkaClient: Sendable {
                 
                 // TODO: actually it should be withing some timeout (like transaction timeout or session timeout)
                 for idx in 0..<attempts {
-                    let error = await forBlockingFunc {
+                    let error = await performBlockingCall(queue: queue) {
                         rd_kafka_send_offsets_to_transaction(self.kafkaHandle, topicPartitionList,
                                                              consumerMetadata, timeout.totalMillisecondsOrMinusOne)
                     }
@@ -520,13 +530,13 @@ final class RDKafkaClient: Sendable {
                     let isFatal = (rd_kafka_error_is_fatal(error) == 1) // fatal when Producer/Consumer must be restarted
                     throw KafkaError.rdKafkaError(wrapping: rd_kafka_error_code(error), isFatal: isFatal)
                 }
+                throw KafkaError.transactionOutOfAttempts(numOfAttempts: attempts)
             }
-            throw KafkaError.transactionOutOfAttempts(numOfAttempts: attempts)
         }
     
     func abortTransaction(attempts: UInt64, timeout: Duration) async throws {
         for _ in 0..<attempts {
-            let error = await forBlockingFunc {
+            let error = await performBlockingCall(queue: queue) {
                 rd_kafka_abort_transaction(self.kafkaHandle, timeout.totalMillisecondsOrMinusOne)
             }
             /* check if transaction abort is completed successfully  */
@@ -547,7 +557,7 @@ final class RDKafkaClient: Sendable {
     
     func commitTransaction(attempts: UInt64, timeout: Duration) async throws {
         for idx in 0..<attempts {
-            let error = await forBlockingFunc {
+            let error = await performBlockingCall(queue: queue) {
                 rd_kafka_commit_transaction(self.kafkaHandle, timeout.totalMillisecondsOrMinusOne)
             }
             /* check if transaction is completed successfully  */
@@ -580,11 +590,11 @@ final class RDKafkaClient: Sendable {
 
 extension Duration {
     // Internal usage only: librdkafka accepts Int32 as timeouts
-    var totalMilliseconds: Int32 {
+    internal var totalMilliseconds: Int32 {
         return Int32(self.components.seconds * 1000 + self.components.attoseconds / 1_000_000_000_000_000)
     }
     
-    var totalMillisecondsOrMinusOne: Int32 {
+    internal var totalMillisecondsOrMinusOne: Int32 {
         return max(totalMilliseconds, -1)
     }
     
diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift b/Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift
index 3713ef98..6e539402 100644
--- a/Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift
+++ b/Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift
@@ -35,9 +35,28 @@ struct RDKafkaTopicConfig {
     /// - Parameter value: The new value of the configuration property to be changed.
     /// - Throws: A ``KafkaError`` if setting the value failed.
     static func set(configPointer: OpaquePointer, key: String, value: String) throws {
+        var size: Int = RDKafkaClient.stringSize
+        let configValue = UnsafeMutablePointer<CChar>.allocate(capacity: size)
+        defer { configValue.deallocate() }
+
+        if RD_KAFKA_CONF_OK == rd_kafka_topic_conf_get(configPointer, key, configValue, &size) {
+            let sizeNoNullTerm = size - 1
+            let wasVal = String(unsafeUninitializedCapacity: sizeNoNullTerm) {
+                let buf = UnsafeRawBufferPointer(
+                    UnsafeMutableRawBufferPointer(
+                        start: configValue,
+                        count: sizeNoNullTerm))
+                _ = $0.initialize(from: buf)
+               return sizeNoNullTerm
+            }
+            if wasVal == value {
+                return // Values are equal, avoid changing (not mark config as modified)
+            }
+        }
+        
         let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: RDKafkaClient.stringSize)
         defer { errorChars.deallocate() }
-
+        
         let configResult = rd_kafka_topic_conf_set(
             configPointer,
             key,
diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift b/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift
index d14532fb..11552c0f 100644
--- a/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift
+++ b/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift
@@ -15,7 +15,7 @@
 import Crdkafka
 
 /// Swift wrapper type for `rd_kafka_topic_partition_list_t`.
-final class RDKafkaTopicPartitionList {
+final public class RDKafkaTopicPartitionList {
     private let _internal: UnsafeMutablePointer<rd_kafka_topic_partition_list_t>
 
     /// Create a new topic+partition list.
diff --git a/Sources/SwiftKafka/Utilities/BlockingCall.swift b/Sources/SwiftKafka/Utilities/BlockingCall.swift
new file mode 100644
index 00000000..da0cd69b
--- /dev/null
+++ b/Sources/SwiftKafka/Utilities/BlockingCall.swift
@@ -0,0 +1,10 @@
+import Dispatch
+
+// performs blocking calls outside of cooperative thread pool
+internal func performBlockingCall<T>(queue: DispatchQueue, body: @escaping () -> T) async -> T {
+    await withCheckedContinuation { continuation in
+        queue.async {
+            continuation.resume(returning: body())
+        }
+    }
+}
diff --git a/Tests/IntegrationTests/SwiftKafkaTests.swift b/Tests/IntegrationTests/SwiftKafkaTests.swift
index 2dd56d25..8db61192 100644
--- a/Tests/IntegrationTests/SwiftKafkaTests.swift
+++ b/Tests/IntegrationTests/SwiftKafkaTests.swift
@@ -17,6 +17,7 @@ import NIOCore
 import ServiceLifecycle
 @testable import SwiftKafka
 import XCTest
+import Logging // TODO: remove
 
 // For testing locally on Mac, do the following:
 //
@@ -39,6 +40,7 @@ final class SwiftKafkaTests: XCTestCase {
     var bootstrapServer: KafkaConfiguration.Broker!
     var producerConfig: KafkaProducerConfiguration!
     var uniqueTestTopic: String!
+    var uniqueTestTopic2: String!
 
     override func setUpWithError() throws {
         self.bootstrapServer = KafkaConfiguration.Broker(host: self.kafkaHost, port: self.kafkaPort)
@@ -61,6 +63,7 @@ final class SwiftKafkaTests: XCTestCase {
             logger: .kafkaTest
         )
         self.uniqueTestTopic = try client._createUniqueTopic(timeout: 10 * 1000)
+        self.uniqueTestTopic2 = try client._createUniqueTopic(timeout: 10 * 1000)
     }
 
     override func tearDownWithError() throws {
@@ -78,6 +81,7 @@ final class SwiftKafkaTests: XCTestCase {
             logger: .kafkaTest
         )
         try client._deleteTopic(self.uniqueTestTopic, timeout: 10 * 1000)
+        try client._deleteTopic(self.uniqueTestTopic2, timeout: 10 * 1000)
 
         self.bootstrapServer = nil
         self.producerConfig = nil
@@ -465,10 +469,11 @@ final class SwiftKafkaTests: XCTestCase {
                     receivedDeliveryReports.insert(deliveryReport)
                 }
             default:
-                break // Ignore any other events
+                continue
+//                break // Ignore any other events
             }
 
-            if receivedDeliveryReports.count >= 2 {
+            if receivedDeliveryReports.count >= messages.count {
                 break
             }
         }
@@ -489,4 +494,120 @@ final class SwiftKafkaTests: XCTestCase {
             XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message.value) }))
         }
     }
+    
+    func testProduceAndConsumeWithTransaction() async throws {
+        let testMessages = Self.createTestMessages(topic: uniqueTestTopic, count: 10)
+        
+        self.producerConfig.debug = [.all]
+
+        let (producer, events) = try KafkaProducer.makeProducerWithEvents(config: self.producerConfig, logger: .kafkaTest)
+        
+        var transactionConfigProducer = self.producerConfig!
+        transactionConfigProducer.transactionalId = "234567"
+        transactionConfigProducer.bootstrapServers = [
+            .init(host: "linux-dev", port: 9092)
+        ]
+
+        var transactionalProducerConfig = KafkaTransactionalProducerConfiguration(
+            transactionalId: "1234",
+            producerConfiguration: transactionConfigProducer)
+
+        transactionalProducerConfig.transactionsTimeout = .seconds(20)
+        let transactionalProducer = try await KafkaTransactionalProducer(config: transactionalProducerConfig, logger: .kafkaTest)
+
+        let makeConsumerConfig = { (topic: String) -> KafkaConsumerConfiguration in
+            var consumerConfig = KafkaConsumerConfiguration(
+                consumptionStrategy: .group(id: "subscription-test-group-id", topics: [topic])
+            )
+            consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning
+            consumerConfig.bootstrapServers = [self.bootstrapServer]
+            consumerConfig.broker.addressFamily = .v4
+            consumerConfig.enableAutoCommit = false
+            return consumerConfig
+        }
+        
+        let consumer = try KafkaConsumer(
+            config: makeConsumerConfig(uniqueTestTopic),
+            logger: .kafkaTest
+        )
+        
+        let consumerAfterTransaction = try KafkaConsumer(
+            config: makeConsumerConfig(uniqueTestTopic2),
+            logger: .kafkaTest
+        )
+
+        let serviceGroup = ServiceGroup(
+            services: [
+                producer,
+                consumer,
+                transactionalProducer,
+                consumerAfterTransaction
+            ],
+            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
+            logger: .kafkaTest
+        )
+
+        try await withThrowingTaskGroup(of: Void.self) { group in
+            // Run Task
+            group.addTask {
+                try await serviceGroup.run()
+            }
+
+            // Producer Task
+            group.addTask {
+                try await Self.sendAndAcknowledgeMessages(
+                    producer: producer,
+                    events: events,
+                    messages: testMessages
+                )
+                print("produced all messages")
+            }
+
+            // Consumer Task
+            group.addTask {
+                var count = 0
+                for try await messageResult in consumer.messages {
+                    guard case let message = messageResult else {
+                        continue
+                    }
+                    count += 1
+                    try await transactionalProducer.withTransaction { transaction in
+                        let newMessage = KafkaProducerMessage(
+                            topic: self.uniqueTestTopic2,
+                            value: message.value.description + "_updated")
+                        try transaction.send(newMessage)
+                        let partitionlist = RDKafkaTopicPartitionList()
+                        partitionlist.setOffset(topic: self.uniqueTestTopic, partition: message.partition, offset: Int64(message.offset))
+                        try await transaction.send(offsets: partitionlist, forConsumer: consumer)
+                    }
+
+                    if count >= testMessages.count {
+                        break
+                    }
+                }
+                print("Changed all messages \(count)")
+            }
+            
+            group.addTask {
+                var count = 0
+                for try await messageAfterTransaction in consumerAfterTransaction.messages {
+                    print("[\(count + 1)] Message after transaction recieved \(messageAfterTransaction)") // TODO: change
+
+                    count += 1
+                    if count >= testMessages.count {
+                        break
+                    }
+                }
+                print("Recieved all changed messages \(count)")
+            }
+
+            // Wait for Producer Task and Consumer Task to complete
+            try await group.next()
+            try await group.next()
+            try await group.next()
+
+            // Shutdown the serviceGroup
+            await serviceGroup.triggerGracefulShutdown()
+        }
+    }
 }
diff --git a/Tests/IntegrationTests/Utilities.swift b/Tests/IntegrationTests/Utilities.swift
index 1c22131f..e6bd9290 100644
--- a/Tests/IntegrationTests/Utilities.swift
+++ b/Tests/IntegrationTests/Utilities.swift
@@ -20,7 +20,7 @@ import Logging
 extension Logger {
     static var kafkaTest: Logger {
         var logger = Logger(label: "kafka.test")
-        logger.logLevel = .info
+        logger.logLevel = .debug
         return logger
     }
 }

From ce702f31deceb6d2895dcd31e0d52d6b1ab18f05 Mon Sep 17 00:00:00 2001
From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com>
Date: Mon, 31 Jul 2023 17:06:05 +0300
Subject: [PATCH 4/5] use shared config with producer

---
 .../KafkaProducerConfiguration.swift          |  88 +----------
 .../KafkaProducerSharedProperties.swift       | 142 ++++++++++++++++++
 ...kaTransactionalProducerConfiguration.swift | 134 +++++++++++++++++
 Sources/SwiftKafka/KafkaConsumer.swift        |   6 +-
 Sources/SwiftKafka/KafkaError.swift           |   8 +-
 Sources/SwiftKafka/KafkaProducer.swift        |  41 +++--
 Sources/SwiftKafka/KafkaTransaction.swift     |  23 ++-
 .../KafkaTransactionalProducer.swift          |  28 ++--
 .../SwiftKafka/RDKafka/RDKafkaClient.swift    | 106 ++++++-------
 .../RDKafka/RDKafkaTopicConfig.swift          |   9 +-
 .../RDKafka/RDKafkaTopicPartitionList.swift   |   4 +-
 Tests/IntegrationTests/SwiftKafkaTests.swift  |  33 ++--
 12 files changed, 415 insertions(+), 207 deletions(-)
 create mode 100644 Sources/SwiftKafka/Configuration/KafkaProducerSharedProperties.swift
 create mode 100644 Sources/SwiftKafka/Configuration/KafkaTransactionalProducerConfiguration.swift

diff --git a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
index 01f9d367..8037176e 100644
--- a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
+++ b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
@@ -97,8 +97,6 @@ public struct KafkaProducerConfiguration {
     /// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
     /// Default: `.plaintext`
     public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext
-    
-    internal var transactionalId: String?
 
     public init() {}
 }
@@ -107,54 +105,7 @@ public struct KafkaProducerConfiguration {
 
 extension KafkaProducerConfiguration {
     internal var dictionary: [String: String] {
-        var resultDict: [String: String] = [:]
-
-        resultDict["enable.idempotence"] = String(self.enableIdempotence)
-        if let transactionalId {
-            resultDict["transactional.id"] = transactionalId
-            resultDict["transaction.timeout.ms"] = "60000"
-            resultDict["message.timeout.ms"] = "60000"
-            
-        }
-        resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
-        resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
-        resultDict["queue.buffering.max.ms"] = String(self.queue.bufferingMaxMilliseconds)
-        resultDict["message.send.max.retries"] = String(self.messageSendMaxRetries)
-        resultDict["allow.auto.create.topics"] = String(self.allowAutoCreateTopics)
-
-        resultDict["client.id"] = self.clientID
-        resultDict["bootstrap.servers"] = self.bootstrapServers.map(\.description).joined(separator: ",")
-        resultDict["message.max.bytes"] = String(self.message.maxBytes)
-        resultDict["message.copy.max.bytes"] = String(self.message.copyMaxBytes)
-        resultDict["receive.message.max.bytes"] = String(self.receiveMessageMaxBytes)
-        resultDict["max.in.flight.requests.per.connection"] = String(self.maxInFlightRequestsPerConnection)
-        resultDict["metadata.max.age.ms"] = String(self.metadataMaxAgeMilliseconds)
-        resultDict["topic.metadata.refresh.interval.ms"] = String(self.topicMetadata.refreshIntervalMilliseconds)
-        resultDict["topic.metadata.refresh.fast.interval.ms"] = String(self.topicMetadata.refreshFastIntervalMilliseconds)
-        resultDict["topic.metadata.refresh.sparse"] = String(self.topicMetadata.refreshSparse)
-        resultDict["topic.metadata.propagation.max.ms"] = String(self.topicMetadata.propagationMaxMilliseconds)
-        resultDict["topic.blacklist"] = self.topicDenylist.joined(separator: ",")
-        if !self.debug.isEmpty {
-            resultDict["debug"] = self.debug.map(\.description).joined(separator: ",")
-        }
-        resultDict["socket.timeout.ms"] = String(self.socket.timeoutMilliseconds)
-        resultDict["socket.send.buffer.bytes"] = String(self.socket.sendBufferBytes)
-        resultDict["socket.receive.buffer.bytes"] = String(self.socket.receiveBufferBytes)
-        resultDict["socket.keepalive.enable"] = String(self.socket.keepaliveEnable)
-        resultDict["socket.nagle.disable"] = String(self.socket.nagleDisable)
-        resultDict["socket.max.fails"] = String(self.socket.maxFails)
-        resultDict["socket.connection.setup.timeout.ms"] = String(self.socket.connectionSetupTimeoutMilliseconds)
-        resultDict["broker.address.ttl"] = String(self.broker.addressTTL)
-        resultDict["broker.address.family"] = self.broker.addressFamily.description
-        resultDict["reconnect.backoff.ms"] = String(self.reconnect.backoffMilliseconds)
-        resultDict["reconnect.backoff.max.ms"] = String(self.reconnect.backoffMaxMilliseconds)
-
-        // Merge with SecurityProtocol configuration dictionary
-        resultDict.merge(self.securityProtocol.dictionary) { _, _ in
-            fatalError("securityProtocol and \(#file) should not have duplicate keys")
-        }
-
-        return resultDict
+        sharedPropsDictionary
     }
 }
 
@@ -166,6 +117,8 @@ extension KafkaProducerConfiguration: Hashable {}
 
 extension KafkaProducerConfiguration: Sendable {}
 
+extension KafkaProducerConfiguration: KafkaProducerSharedProperties {}
+
 // MARK: - KafkaConfiguration + Producer Additions
 
 extension KafkaConfiguration {
@@ -194,38 +147,3 @@ extension KafkaConfiguration {
         }
     }
 }
-
-// FIXME: should we really duplicate `KafkaProducerConfiguration`
-// FIXME: after public api updated?
-public struct KafkaTransactionalProducerConfiguration {
-    var transactionalId: String
-    var transactionsTimeout: Duration
-    
-    var producerConfiguration: KafkaProducerConfiguration {
-        set {
-            self.producerConfiguration_ = newValue
-        }
-        get {
-            var conf = self.producerConfiguration_
-            conf.transactionalId = self.transactionalId
-            conf.enableIdempotence = true
-            conf.maxInFlightRequestsPerConnection = min(conf.maxInFlightRequestsPerConnection, 5)
-            return conf
-        }
-    }
-    
-    private var producerConfiguration_: KafkaProducerConfiguration = .init()
-    
-    public init(transactionalId: String, transactionsTimeout: Duration = .kafkaUntilEndOfTransactionTimeout, producerConfiguration: KafkaProducerConfiguration = .init()) {
-        self.transactionalId = transactionalId
-        self.transactionsTimeout = transactionsTimeout
-        self.producerConfiguration = producerConfiguration
-    }
-}
-// MARK: - KafkaProducerConfiguration + Hashable
-
-extension KafkaTransactionalProducerConfiguration: Hashable {}
-
-// MARK: - KafkaProducerConfiguration + Sendable
-
-extension KafkaTransactionalProducerConfiguration: Sendable {}
diff --git a/Sources/SwiftKafka/Configuration/KafkaProducerSharedProperties.swift b/Sources/SwiftKafka/Configuration/KafkaProducerSharedProperties.swift
new file mode 100644
index 00000000..7488c58f
--- /dev/null
+++ b/Sources/SwiftKafka/Configuration/KafkaProducerSharedProperties.swift
@@ -0,0 +1,142 @@
+//===----------------------------------------------------------------------===//
+//
+// This source file is part of the swift-kafka-gsoc open source project
+//
+// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
+// Licensed under Apache License v2.0
+//
+// See LICENSE.txt for license information
+// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+//===----------------------------------------------------------------------===//
+
+internal protocol KafkaProducerSharedProperties: Sendable, Hashable {
+    // MARK: - SwiftKafka-specific Config properties
+
+    /// The time between two consecutive polls.
+    /// Effectively controls the rate at which incoming events are consumed.
+    /// Default: `.milliseconds(100)`
+    var pollInterval: Duration { get }
+
+    /// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
+    /// Default: `10000`
+    var flushTimeoutMilliseconds: Int { get }
+
+    // MARK: - Producer-specific Config Properties
+
+    /// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
+    /// Default: `false`
+    var enableIdempotence: Bool { get }
+
+    /// Producer queue options.
+    var queue: KafkaConfiguration.QueueOptions { get }
+
+    /// How many times to retry sending a failing Message. Note: retrying may cause reordering unless enable.idempotence is set to true.
+    /// Default: `2_147_483_647`
+    var messageSendMaxRetries: Int { get }
+
+    /// Allow automatic topic creation on the broker when producing to non-existent topics.
+    /// The broker must also be configured with auto.create.topics.enable=true for this configuration to take effect.
+    /// Default: `true`
+    var allowAutoCreateTopics: Bool { get }
+
+    // MARK: - Common Client Config Properties
+
+    /// Client identifier.
+    /// Default: `"rdkafka"`
+    var clientID: String { get }
+
+    /// Initial list of brokers.
+    /// Default: `[]`
+    var bootstrapServers: [KafkaConfiguration.Broker] { get }
+
+    /// Message options.
+    var message: KafkaConfiguration.MessageOptions { get }
+
+    /// Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set.
+    /// Default: `100_000_000`
+    var receiveMessageMaxBytes: Int { get }
+
+    /// Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.
+    /// Default: `1_000_000`
+    var maxInFlightRequestsPerConnection: Int { get }
+
+    /// Metadata cache max age.
+    /// Default: `900_000`
+    var metadataMaxAgeMilliseconds: Int { get }
+
+    /// Topic metadata options.
+    var topicMetadata: KafkaConfiguration.TopicMetadataOptions { get }
+
+    /// Topic denylist.
+    /// Default: `[]`
+    var topicDenylist: [String] { get }
+
+    /// Debug options.
+    /// Default: `[]`
+    var debug: [KafkaConfiguration.DebugOption] { get }
+
+    /// Socket options.
+    var socket: KafkaConfiguration.SocketOptions { get }
+
+    /// Broker options.
+    var broker: KafkaConfiguration.BrokerOptions { get }
+
+    /// Reconnect options.
+    var reconnect: KafkaConfiguration.ReconnectOptions { get }
+
+    /// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
+    /// Default: `.plaintext`
+    var securityProtocol: KafkaConfiguration.SecurityProtocol { get }
+
+    var dictionary: [String: String] { get }
+}
+
+extension KafkaProducerSharedProperties {
+    internal var sharedPropsDictionary: [String: String] {
+        var resultDict: [String: String] = [:]
+
+        resultDict["enable.idempotence"] = String(self.enableIdempotence)
+        resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
+        resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
+        resultDict["queue.buffering.max.ms"] = String(self.queue.bufferingMaxMilliseconds)
+        resultDict["message.send.max.retries"] = String(self.messageSendMaxRetries)
+        resultDict["allow.auto.create.topics"] = String(self.allowAutoCreateTopics)
+
+        resultDict["client.id"] = self.clientID
+        resultDict["bootstrap.servers"] = self.bootstrapServers.map(\.description).joined(separator: ",")
+        resultDict["message.max.bytes"] = String(self.message.maxBytes)
+        resultDict["message.copy.max.bytes"] = String(self.message.copyMaxBytes)
+        resultDict["receive.message.max.bytes"] = String(self.receiveMessageMaxBytes)
+        resultDict["max.in.flight.requests.per.connection"] = String(self.maxInFlightRequestsPerConnection)
+        resultDict["metadata.max.age.ms"] = String(self.metadataMaxAgeMilliseconds)
+        resultDict["topic.metadata.refresh.interval.ms"] = String(self.topicMetadata.refreshIntervalMilliseconds)
+        resultDict["topic.metadata.refresh.fast.interval.ms"] = String(self.topicMetadata.refreshFastIntervalMilliseconds)
+        resultDict["topic.metadata.refresh.sparse"] = String(self.topicMetadata.refreshSparse)
+        resultDict["topic.metadata.propagation.max.ms"] = String(self.topicMetadata.propagationMaxMilliseconds)
+        resultDict["topic.blacklist"] = self.topicDenylist.joined(separator: ",")
+        if !self.debug.isEmpty {
+            resultDict["debug"] = self.debug.map(\.description).joined(separator: ",")
+        }
+        resultDict["socket.timeout.ms"] = String(self.socket.timeoutMilliseconds)
+        resultDict["socket.send.buffer.bytes"] = String(self.socket.sendBufferBytes)
+        resultDict["socket.receive.buffer.bytes"] = String(self.socket.receiveBufferBytes)
+        resultDict["socket.keepalive.enable"] = String(self.socket.keepaliveEnable)
+        resultDict["socket.nagle.disable"] = String(self.socket.nagleDisable)
+        resultDict["socket.max.fails"] = String(self.socket.maxFails)
+        resultDict["socket.connection.setup.timeout.ms"] = String(self.socket.connectionSetupTimeoutMilliseconds)
+        resultDict["broker.address.ttl"] = String(self.broker.addressTTL)
+        resultDict["broker.address.family"] = self.broker.addressFamily.description
+        resultDict["reconnect.backoff.ms"] = String(self.reconnect.backoffMilliseconds)
+        resultDict["reconnect.backoff.max.ms"] = String(self.reconnect.backoffMaxMilliseconds)
+
+        // Merge with SecurityProtocol configuration dictionary
+        resultDict.merge(self.securityProtocol.dictionary) { _, _ in
+            fatalError("securityProtocol and \(#file) should not have duplicate keys")
+        }
+
+        return resultDict
+    }
+}
diff --git a/Sources/SwiftKafka/Configuration/KafkaTransactionalProducerConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaTransactionalProducerConfiguration.swift
new file mode 100644
index 00000000..f5cca7c2
--- /dev/null
+++ b/Sources/SwiftKafka/Configuration/KafkaTransactionalProducerConfiguration.swift
@@ -0,0 +1,134 @@
+//===----------------------------------------------------------------------===//
+//
+// This source file is part of the swift-kafka-gsoc open source project
+//
+// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
+// Licensed under Apache License v2.0
+//
+// See LICENSE.txt for license information
+// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+//===----------------------------------------------------------------------===//
+
+// FIXME: should we really duplicate `KafkaProducerConfiguration`
+// FIXME: after public api updated?
+public struct KafkaTransactionalProducerConfiguration {
+    // MARK: - SwiftKafka-specific Config properties
+
+    /// The time between two consecutive polls.
+    /// Effectively controls the rate at which incoming events are consumed.
+    /// Default: `.milliseconds(100)`
+    public var pollInterval: Duration = .milliseconds(100)
+
+    /// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
+    /// Default: `10000`
+    public var flushTimeoutMilliseconds: Int = 10000 {
+        didSet {
+            precondition(
+                0...Int(Int32.max) ~= self.flushTimeoutMilliseconds,
+                "Flush timeout outside of valid range \(0...Int32.max)"
+            )
+        }
+    }
+
+    // MARK: - Producer-specific Config Properties
+
+    /// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
+    /// Default: `false`
+    internal let enableIdempotence: Bool = true
+
+    /// Producer queue options.
+    public var queue: KafkaConfiguration.QueueOptions = .init()
+
+    /// How many times to retry sending a failing Message. Note: retrying may cause reordering unless enable.idempotence is set to true.
+    /// Default: `2_147_483_647`
+    public var messageSendMaxRetries: Int = 2_147_483_647
+
+    /// Allow automatic topic creation on the broker when producing to non-existent topics.
+    /// The broker must also be configured with auto.create.topics.enable=true for this configuration to take effect.
+    /// Default: `true`
+    public var allowAutoCreateTopics: Bool = true
+
+    // MARK: - Common Client Config Properties
+
+    /// Client identifier.
+    /// Default: `"rdkafka"`
+    public var clientID: String = "rdkafka"
+
+    /// Initial list of brokers.
+    /// Default: `[]`
+    public var bootstrapServers: [KafkaConfiguration.Broker] = []
+
+    /// Message options.
+    public var message: KafkaConfiguration.MessageOptions = .init()
+
+    /// Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set.
+    /// Default: `100_000_000`
+    public var receiveMessageMaxBytes: Int = 100_000_000
+
+    /// Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.
+    /// Default: `1_000_000`
+    public var maxInFlightRequestsPerConnection: Int = 5 {
+        didSet {
+            precondition(
+                0...5 ~= self.maxInFlightRequestsPerConnection,
+                "Transactional producer can have no more than 5 in flight requests"
+            )
+        }
+    }
+
+    /// Metadata cache max age.
+    /// Default: `900_000`
+    public var metadataMaxAgeMilliseconds: Int = 900_000
+
+    /// Topic metadata options.
+    public var topicMetadata: KafkaConfiguration.TopicMetadataOptions = .init()
+
+    /// Topic denylist.
+    /// Default: `[]`
+    public var topicDenylist: [String] = []
+
+    /// Debug options.
+    /// Default: `[]`
+    public var debug: [KafkaConfiguration.DebugOption] = []
+
+    /// Socket options.
+    public var socket: KafkaConfiguration.SocketOptions = .init()
+
+    /// Broker options.
+    public var broker: KafkaConfiguration.BrokerOptions = .init()
+
+    /// Reconnect options.
+    public var reconnect: KafkaConfiguration.ReconnectOptions = .init()
+
+    /// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
+    /// Default: `.plaintext`
+    public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext
+
+    // TODO: add Docc
+    var transactionalId: String
+    var transactionsTimeout: Duration = .seconds(60) // equal to socket TODO: add didSet
+
+    public init(transactionalId: String) {
+        self.transactionalId = transactionalId
+    }
+}
+
+// MARK: - KafkaProducerConfiguration + Hashable
+
+extension KafkaTransactionalProducerConfiguration: Hashable {}
+
+// MARK: - KafkaProducerConfiguration + Sendable
+
+extension KafkaTransactionalProducerConfiguration: Sendable {}
+
+extension KafkaTransactionalProducerConfiguration: KafkaProducerSharedProperties {
+    internal var dictionary: [String: String] {
+        var resultDict: [String: String] = sharedPropsDictionary
+        resultDict["transactional.id"] = self.transactionalId
+        resultDict["transaction.timeout.ms"] = String(self.transactionsTimeout.totalMilliseconds)
+        return resultDict
+    }
+}
diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift
index 7fdd1db9..23f99db3 100644
--- a/Sources/SwiftKafka/KafkaConsumer.swift
+++ b/Sources/SwiftKafka/KafkaConsumer.swift
@@ -408,9 +408,9 @@ public final class KafkaConsumer: Sendable, Service {
             }
         }
     }
-    
+
     func client() throws -> RDKafkaClient {
-        return try stateMachine.withLockedValue { try $0.client() }
+        return try self.stateMachine.withLockedValue { try $0.client() }
     }
 }
 
@@ -659,7 +659,7 @@ extension KafkaConsumer {
                 return nil
             }
         }
-        
+
         func client() throws -> RDKafkaClient {
             switch self.state {
             case .uninitialized:
diff --git a/Sources/SwiftKafka/KafkaError.swift b/Sources/SwiftKafka/KafkaError.swift
index ca3b5cf2..0e13cdb9 100644
--- a/Sources/SwiftKafka/KafkaError.swift
+++ b/Sources/SwiftKafka/KafkaError.swift
@@ -133,7 +133,7 @@ public struct KafkaError: Error, CustomStringConvertible {
             )
         )
     }
-    
+
     static func transactionAborted(
         reason: String, file: String = #fileID, line: UInt = #line
     ) -> KafkaError {
@@ -143,7 +143,7 @@ public struct KafkaError: Error, CustomStringConvertible {
             )
         )
     }
-    
+
     static func transactionIncomplete(
         reason: String, file: String = #fileID, line: UInt = #line
     ) -> KafkaError {
@@ -153,7 +153,7 @@ public struct KafkaError: Error, CustomStringConvertible {
             )
         )
     }
-    
+
     static func transactionOutOfAttempts(
         numOfAttempts: UInt64, file: String = #fileID, line: UInt = #line
     ) -> KafkaError {
@@ -233,7 +233,7 @@ extension KafkaError {
         let file: String
 
         let line: UInt
-        
+
         let isFatal: Bool
 
         fileprivate init(
diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift
index 6133c871..0aac720f 100644
--- a/Sources/SwiftKafka/KafkaProducer.swift
+++ b/Sources/SwiftKafka/KafkaProducer.swift
@@ -80,8 +80,10 @@ public final class KafkaProducer: Service, Sendable {
     /// State of the ``KafkaProducer``.
     private let stateMachine: NIOLockedValueBox<StateMachine>
 
-    /// The configuration object of the producer client.
-    private let config: KafkaProducerConfiguration
+    /// Configured poll interval
+    private let pollInterval: Duration
+    /// Configured flush timeout
+    private let flushTimeout: Duration
     /// Topic configuration that is used when a new topic has to be created by the producer.
     private let topicConfig: KafkaTopicConfiguration
 
@@ -94,11 +96,16 @@ public final class KafkaProducer: Service, Sendable {
     /// - Throws: A ``KafkaError`` if initializing the producer failed.
     private init(
         stateMachine: NIOLockedValueBox<KafkaProducer.StateMachine>,
-        config: KafkaProducerConfiguration,
+        config: any KafkaProducerSharedProperties,
         topicConfig: KafkaTopicConfiguration
     ) throws {
         self.stateMachine = stateMachine
-        self.config = config
+        self.pollInterval = config.pollInterval
+        precondition(
+            0...Int(Int32.max) ~= config.flushTimeoutMilliseconds,
+            "Flush timeout outside of valid range \(0...Int32.max)"
+        )
+        self.flushTimeout = .milliseconds(config.flushTimeoutMilliseconds)
         self.topicConfig = topicConfig
     }
 
@@ -115,6 +122,14 @@ public final class KafkaProducer: Service, Sendable {
         config: KafkaProducerConfiguration = KafkaProducerConfiguration(),
         topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
         logger: Logger
+    ) throws {
+        try self.init(config: config as (any KafkaProducerSharedProperties), topicConfig: topicConfig, logger: logger)
+    }
+
+    internal convenience init(
+        config: any KafkaProducerSharedProperties,
+        topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
+        logger: Logger
     ) throws {
         let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))
 
@@ -156,6 +171,14 @@ public final class KafkaProducer: Service, Sendable {
         config: KafkaProducerConfiguration = KafkaProducerConfiguration(),
         topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
         logger: Logger
+    ) throws -> (KafkaProducer, KafkaProducerEvents) {
+        return try self.makeProducerWithEvents(config: config as (any KafkaProducerSharedProperties), topicConfig: topicConfig, logger: logger)
+    }
+
+    internal static func makeProducerWithEvents(
+        config: any KafkaProducerSharedProperties,
+        topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
+        logger: Logger
     ) throws -> (KafkaProducer, KafkaProducerEvents) {
         let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))
 
@@ -215,13 +238,9 @@ public final class KafkaProducer: Service, Sendable {
                     // Ignore YieldResult as we don't support back pressure in KafkaProducer
                     _ = source?.yield(producerEvent)
                 }
-                try await Task.sleep(for: self.config.pollInterval)
+                try await Task.sleep(for: self.pollInterval)
             case .flushFinishSourceAndTerminatePollLoop(let client, let source):
-                precondition(
-                    0...Int(Int32.max) ~= self.config.flushTimeoutMilliseconds,
-                    "Flush timeout outside of valid range \(0...Int32.max)"
-                )
-                try await client.flush(timeoutMilliseconds: Int32(self.config.flushTimeoutMilliseconds))
+                try await client.flush(timeoutMilliseconds: self.flushTimeout.totalMilliseconds)
                 source?.finish()
                 return
             case .terminatePollLoop:
@@ -453,7 +472,7 @@ extension KafkaProducer {
                 break
             }
         }
-        
+
         func client() throws -> RDKafkaClient {
             switch self.state {
             case .uninitialized:
diff --git a/Sources/SwiftKafka/KafkaTransaction.swift b/Sources/SwiftKafka/KafkaTransaction.swift
index cd1969df..890cc8e2 100644
--- a/Sources/SwiftKafka/KafkaTransaction.swift
+++ b/Sources/SwiftKafka/KafkaTransaction.swift
@@ -3,18 +3,15 @@
 public final class KafkaTransaction {
     let client: RDKafkaClient
     let producer: KafkaProducer
-    let config: KafkaTransactionalProducerConfiguration
-    
-    init(client: RDKafkaClient, producer: KafkaProducer, config: KafkaTransactionalProducerConfiguration) throws {
+
+    init(client: RDKafkaClient, producer: KafkaProducer) throws {
         self.client = client
         self.producer = producer
-        self.config = config
 
         try client.beginTransaction()
     }
-    
-    deinit {
-    }
+
+    deinit {}
 
     public func send(
         offsets: RDKafkaTopicPartitionList,
@@ -24,20 +21,20 @@ public final class KafkaTransaction {
     ) async throws {
         let consumerClient = try consumer.client()
         try await consumerClient.withKafkaHandlePointer {
-            try await client.send(attempts: attempts, offsets: offsets, forConsumerKafkaHandle: $0, timeout: timeout)
+            try await self.client.send(attempts: attempts, offsets: offsets, forConsumerKafkaHandle: $0, timeout: timeout)
         }
     }
-    
+
     @discardableResult
     public func send<Key, Value>(_ message: KafkaProducerMessage<Key, Value>) throws -> KafkaProducerMessageID {
         try self.producer.send(message)
     }
-    
+
     func commit() async throws {
-        try await client.commitTransaction(attempts: .max, timeout: .kafkaUntilEndOfTransactionTimeout)
+        try await self.client.commitTransaction(attempts: .max, timeout: .kafkaUntilEndOfTransactionTimeout)
     }
-    
+
     func abort() async throws {
-        try await client.abortTransaction(attempts: .max, timeout: .kafkaUntilEndOfTransactionTimeout)
+        try await self.client.abortTransaction(attempts: .max, timeout: .kafkaUntilEndOfTransactionTimeout)
     }
 }
diff --git a/Sources/SwiftKafka/KafkaTransactionalProducer.swift b/Sources/SwiftKafka/KafkaTransactionalProducer.swift
index d35c8f26..61139ba2 100644
--- a/Sources/SwiftKafka/KafkaTransactionalProducer.swift
+++ b/Sources/SwiftKafka/KafkaTransactionalProducer.swift
@@ -3,11 +3,9 @@ import ServiceLifecycle
 
 public final class KafkaTransactionalProducer: Service, Sendable {
     let producer: KafkaProducer
-    let config: KafkaTransactionalProducerConfiguration
-    
+
     private init(producer: KafkaProducer, config: KafkaTransactionalProducerConfiguration) async throws {
         self.producer = producer
-        self.config = config
         let client = try producer.client()
         try await client.initTransactions(timeout: config.transactionsTimeout)
     }
@@ -26,10 +24,10 @@ public final class KafkaTransactionalProducer: Service, Sendable {
         topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
         logger: Logger
     ) async throws {
-        let producer = try KafkaProducer(config: config.producerConfiguration, topicConfig: topicConfig, logger: logger)
+        let producer = try KafkaProducer(config: config, topicConfig: topicConfig, logger: logger)
         try await self.init(producer: producer, config: config)
     }
-    
+
     /// Initialize a new ``KafkaTransactionalProducer`` and a ``KafkaProducerEvents`` asynchronous sequence.
     ///
     /// Use the asynchronous sequence to consume events.
@@ -48,10 +46,14 @@ public final class KafkaTransactionalProducer: Service, Sendable {
         topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
         logger: Logger
     ) async throws -> (KafkaTransactionalProducer, KafkaProducerEvents) {
-        let (producer, events) = try KafkaProducer.makeProducerWithEvents(config: config.producerConfiguration, topicConfig: topicConfig, logger: logger)
-        
+        let (producer, events) = try KafkaProducer.makeProducerWithEvents(
+            config: config,
+            topicConfig: topicConfig,
+            logger: logger
+        )
+
         let transactionalProducer = try await KafkaTransactionalProducer(producer: producer, config: config)
-        
+
         return (transactionalProducer, events)
     }
 
@@ -59,9 +61,9 @@ public final class KafkaTransactionalProducer: Service, Sendable {
     public func withTransaction(_ body: @Sendable (KafkaTransaction) async throws -> Void) async throws {
         let transaction = try KafkaTransaction(
             client: try producer.client(),
-            producer: producer,
-            config: config)
-        
+            producer: self.producer
+        )
+
         do { // need to think here a little bit how to abort transaction
             try await body(transaction)
             try await transaction.commit()
@@ -76,8 +78,8 @@ public final class KafkaTransactionalProducer: Service, Sendable {
             throw error
         }
     }
-    
+
     public func run() async throws {
-        try await producer.run()
+        try await self.producer.run()
     }
 }
diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift b/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
index 655e0926..313d4d99 100644
--- a/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
+++ b/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
@@ -14,7 +14,6 @@
 
 import Crdkafka
 import Dispatch
-import Dispatch
 import Logging
 
 /// Base class for ``KafkaProducer`` and ``KafkaConsumer``,
@@ -36,7 +35,7 @@ final class RDKafkaClient: Sendable {
 
     /// `librdkafka`'s main `rd_kafka_queue_t`.
     private let mainQueue: OpaquePointer
-    
+
     /// Queue for blocking calls outside of cooperative thread pool
     private var queue: DispatchQueue {
         // global concurrent queue
@@ -455,7 +454,7 @@ final class RDKafkaClient: Sendable {
     func withKafkaHandlePointer<T>(_ body: (OpaquePointer) throws -> T) rethrows -> T {
         return try body(self.kafkaHandle)
     }
-    
+
     /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle with async closure.
     /// - Warning: Do not escape the pointer from the closure for later use.
     /// - Parameter body: The closure will use the Kafka handle pointer.
@@ -463,23 +462,23 @@ final class RDKafkaClient: Sendable {
     func withKafkaHandlePointer<T>(_ body: (OpaquePointer) async throws -> T) async rethrows -> T {
         return try await body(self.kafkaHandle)
     }
-    
+
     func initTransactions(timeout: Duration) async throws {
-        rd_kafka_conf_set_dr_msg_cb(self.kafkaHandle, {_,_,_ in
+        rd_kafka_conf_set_dr_msg_cb(self.kafkaHandle) { _, _, _ in
             print("test")
-        })
-        
+        }
+
         let result = await performBlockingCall(queue: queue) {
             rd_kafka_init_transactions(self.kafkaHandle, timeout.totalMilliseconds)
         }
-        
+
         if result != nil {
             let code = rd_kafka_error_code(result)
             rd_kafka_error_destroy(result)
             throw KafkaError.rdKafkaError(wrapping: code)
         }
     }
-    
+
     func beginTransaction() throws {
         let result = rd_kafka_begin_transaction(kafkaHandle)
         if result != nil {
@@ -488,52 +487,53 @@ final class RDKafkaClient: Sendable {
             throw KafkaError.rdKafkaError(wrapping: code)
         }
     }
-    
+
     func send(
         attempts: UInt64,
         offsets: RDKafkaTopicPartitionList,
         forConsumerKafkaHandle consumer: OpaquePointer,
-        timeout: Duration) async throws {
-            try await offsets.withListPointer { topicPartitionList in
-                
-                let consumerMetadata = rd_kafka_consumer_group_metadata(consumer)
-                defer { rd_kafka_consumer_group_metadata_destroy(consumerMetadata) }
-                
-                // TODO: actually it should be withing some timeout (like transaction timeout or session timeout)
-                for idx in 0..<attempts {
-                    let error = await performBlockingCall(queue: queue) {
-                        rd_kafka_send_offsets_to_transaction(self.kafkaHandle, topicPartitionList,
-                                                             consumerMetadata, timeout.totalMillisecondsOrMinusOne)
-                    }
-                    
-                    /* check if offset commit is completed successfully  */
-                    if error == nil {
-                        return
-                    }
-                    defer { rd_kafka_error_destroy(error) }
-                    
-                    /* check if offset commit is retriable */
-                    if rd_kafka_error_is_retriable(error) == 1 {
-                        continue
-                    }
-                    
-                    /* check if transaction need to be aborted */
-                    if rd_kafka_error_txn_requires_abort(error) == 1 {
-                        do {
-                            try await abortTransaction(attempts: attempts - idx, timeout: timeout)
-                            throw KafkaError.transactionAborted(reason: "Transaction aborted and can be started from scratch")
-                        } catch {
-                            throw KafkaError.transactionIncomplete(
-                                reason: "Could not complete or abort transaction with error \(error)")
-                        }
+        timeout: Duration
+    ) async throws {
+        try await offsets.withListPointer { topicPartitionList in
+
+            let consumerMetadata = rd_kafka_consumer_group_metadata(consumer)
+            defer { rd_kafka_consumer_group_metadata_destroy(consumerMetadata) }
+
+            // TODO: actually it should be withing some timeout (like transaction timeout or session timeout)
+            for idx in 0..<attempts {
+                let error = await performBlockingCall(queue: queue) {
+                    rd_kafka_send_offsets_to_transaction(self.kafkaHandle, topicPartitionList,
+                                                         consumerMetadata, timeout.totalMillisecondsOrMinusOne)
+                }
+
+                /* check if offset commit is completed successfully  */
+                if error == nil {
+                    return
+                }
+                defer { rd_kafka_error_destroy(error) }
+
+                /* check if offset commit is retriable */
+                if rd_kafka_error_is_retriable(error) == 1 {
+                    continue
+                }
+
+                /* check if transaction need to be aborted */
+                if rd_kafka_error_txn_requires_abort(error) == 1 {
+                    do {
+                        try await self.abortTransaction(attempts: attempts - idx, timeout: timeout)
+                        throw KafkaError.transactionAborted(reason: "Transaction aborted and can be started from scratch")
+                    } catch {
+                        throw KafkaError.transactionIncomplete(
+                            reason: "Could not complete or abort transaction with error \(error)")
                     }
-                    let isFatal = (rd_kafka_error_is_fatal(error) == 1) // fatal when Producer/Consumer must be restarted
-                    throw KafkaError.rdKafkaError(wrapping: rd_kafka_error_code(error), isFatal: isFatal)
                 }
-                throw KafkaError.transactionOutOfAttempts(numOfAttempts: attempts)
+                let isFatal = (rd_kafka_error_is_fatal(error) == 1) // fatal when Producer/Consumer must be restarted
+                throw KafkaError.rdKafkaError(wrapping: rd_kafka_error_code(error), isFatal: isFatal)
             }
+            throw KafkaError.transactionOutOfAttempts(numOfAttempts: attempts)
         }
-    
+    }
+
     func abortTransaction(attempts: UInt64, timeout: Duration) async throws {
         for _ in 0..<attempts {
             let error = await performBlockingCall(queue: queue) {
@@ -554,7 +554,7 @@ final class RDKafkaClient: Sendable {
         }
         throw KafkaError.transactionOutOfAttempts(numOfAttempts: attempts)
     }
-    
+
     func commitTransaction(attempts: UInt64, timeout: Duration) async throws {
         for idx in 0..<attempts {
             let error = await performBlockingCall(queue: queue) {
@@ -569,11 +569,11 @@ final class RDKafkaClient: Sendable {
                 continue
             }
             defer { rd_kafka_error_destroy(error) }
-            
+
             /* check if transaction need to be aborted */
             if rd_kafka_error_txn_requires_abort(error) == 1 {
                 do {
-                    try await abortTransaction(attempts: attempts - idx, timeout: timeout)
+                    try await self.abortTransaction(attempts: attempts - idx, timeout: timeout)
                     throw KafkaError.transactionAborted(reason: "Transaction aborted and can be started from scratch")
                 } catch {
                     throw KafkaError.transactionIncomplete(
@@ -593,11 +593,11 @@ extension Duration {
     internal var totalMilliseconds: Int32 {
         return Int32(self.components.seconds * 1000 + self.components.attoseconds / 1_000_000_000_000_000)
     }
-    
+
     internal var totalMillisecondsOrMinusOne: Int32 {
-        return max(totalMilliseconds, -1)
+        return max(self.totalMilliseconds, -1)
     }
-    
+
     public static var kafkaUntilEndOfTransactionTimeout: Duration = .milliseconds(-1)
     public static var kafkaNoWaitTransaction: Duration = .zero
 }
diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift b/Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift
index 6e539402..029aac1e 100644
--- a/Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift
+++ b/Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift
@@ -45,18 +45,19 @@ struct RDKafkaTopicConfig {
                 let buf = UnsafeRawBufferPointer(
                     UnsafeMutableRawBufferPointer(
                         start: configValue,
-                        count: sizeNoNullTerm))
+                        count: sizeNoNullTerm
+                    ))
                 _ = $0.initialize(from: buf)
-               return sizeNoNullTerm
+                return sizeNoNullTerm
             }
             if wasVal == value {
                 return // Values are equal, avoid changing (not mark config as modified)
             }
         }
-        
+
         let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: RDKafkaClient.stringSize)
         defer { errorChars.deallocate() }
-        
+
         let configResult = rd_kafka_topic_conf_set(
             configPointer,
             key,
diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift b/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift
index 11552c0f..1e92bd3d 100644
--- a/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift
+++ b/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift
@@ -15,7 +15,7 @@
 import Crdkafka
 
 /// Swift wrapper type for `rd_kafka_topic_partition_list_t`.
-final public class RDKafkaTopicPartitionList {
+public final class RDKafkaTopicPartitionList {
     private let _internal: UnsafeMutablePointer<rd_kafka_topic_partition_list_t>
 
     /// Create a new topic+partition list.
@@ -57,7 +57,7 @@ final public class RDKafkaTopicPartitionList {
     func withListPointer<T>(_ body: (UnsafeMutablePointer<rd_kafka_topic_partition_list_t>) throws -> T) rethrows -> T {
         return try body(self._internal)
     }
-    
+
     /// Scoped accessor that enables safe access to the pointer of the underlying `rd_kafka_topic_partition_t`.
     /// - Warning: Do not escape the pointer from the closure for later use.
     /// - Parameter body: The closure will use the pointer.
diff --git a/Tests/IntegrationTests/SwiftKafkaTests.swift b/Tests/IntegrationTests/SwiftKafkaTests.swift
index 8db61192..92b8da48 100644
--- a/Tests/IntegrationTests/SwiftKafkaTests.swift
+++ b/Tests/IntegrationTests/SwiftKafkaTests.swift
@@ -13,11 +13,11 @@
 //===----------------------------------------------------------------------===//
 
 import struct Foundation.UUID
+import Logging // TODO: remove
 import NIOCore
 import ServiceLifecycle
 @testable import SwiftKafka
 import XCTest
-import Logging // TODO: remove
 
 // For testing locally on Mac, do the following:
 //
@@ -494,26 +494,20 @@ final class SwiftKafkaTests: XCTestCase {
             XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message.value) }))
         }
     }
-    
+
     func testProduceAndConsumeWithTransaction() async throws {
         let testMessages = Self.createTestMessages(topic: uniqueTestTopic, count: 10)
-        
+
         self.producerConfig.debug = [.all]
 
         let (producer, events) = try KafkaProducer.makeProducerWithEvents(config: self.producerConfig, logger: .kafkaTest)
-        
-        var transactionConfigProducer = self.producerConfig!
-        transactionConfigProducer.transactionalId = "234567"
-        transactionConfigProducer.bootstrapServers = [
-            .init(host: "linux-dev", port: 9092)
-        ]
 
-        var transactionalProducerConfig = KafkaTransactionalProducerConfiguration(
-            transactionalId: "1234",
-            producerConfiguration: transactionConfigProducer)
+        var transactionConfigProducer = KafkaTransactionalProducerConfiguration(transactionalId: "1234")
 
-        transactionalProducerConfig.transactionsTimeout = .seconds(20)
-        let transactionalProducer = try await KafkaTransactionalProducer(config: transactionalProducerConfig, logger: .kafkaTest)
+        transactionConfigProducer.bootstrapServers = [self.bootstrapServer]
+        transactionConfigProducer.broker.addressFamily = .v4
+
+        let transactionalProducer = try await KafkaTransactionalProducer(config: transactionConfigProducer, logger: .kafkaTest)
 
         let makeConsumerConfig = { (topic: String) -> KafkaConsumerConfiguration in
             var consumerConfig = KafkaConsumerConfiguration(
@@ -525,12 +519,12 @@ final class SwiftKafkaTests: XCTestCase {
             consumerConfig.enableAutoCommit = false
             return consumerConfig
         }
-        
+
         let consumer = try KafkaConsumer(
             config: makeConsumerConfig(uniqueTestTopic),
             logger: .kafkaTest
         )
-        
+
         let consumerAfterTransaction = try KafkaConsumer(
             config: makeConsumerConfig(uniqueTestTopic2),
             logger: .kafkaTest
@@ -541,7 +535,7 @@ final class SwiftKafkaTests: XCTestCase {
                 producer,
                 consumer,
                 transactionalProducer,
-                consumerAfterTransaction
+                consumerAfterTransaction,
             ],
             configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
             logger: .kafkaTest
@@ -574,7 +568,8 @@ final class SwiftKafkaTests: XCTestCase {
                     try await transactionalProducer.withTransaction { transaction in
                         let newMessage = KafkaProducerMessage(
                             topic: self.uniqueTestTopic2,
-                            value: message.value.description + "_updated")
+                            value: message.value.description + "_updated"
+                        )
                         try transaction.send(newMessage)
                         let partitionlist = RDKafkaTopicPartitionList()
                         partitionlist.setOffset(topic: self.uniqueTestTopic, partition: message.partition, offset: Int64(message.offset))
@@ -587,7 +582,7 @@ final class SwiftKafkaTests: XCTestCase {
                 }
                 print("Changed all messages \(count)")
             }
-            
+
             group.addTask {
                 var count = 0
                 for try await messageAfterTransaction in consumerAfterTransaction.messages {

From 6501aaafbacb5f011a1c1db9da62cb40a5d678f0 Mon Sep 17 00:00:00 2001
From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com>
Date: Thu, 10 Aug 2023 18:19:52 +0300
Subject: [PATCH 5/5] remove merge artifact

---
 .../KafkaProducerConfiguration.swift          | 149 ------------------
 1 file changed, 149 deletions(-)
 delete mode 100644 Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift

diff --git a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
deleted file mode 100644
index 8037176e..00000000
--- a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
+++ /dev/null
@@ -1,149 +0,0 @@
-//===----------------------------------------------------------------------===//
-//
-// This source file is part of the swift-kafka-gsoc open source project
-//
-// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
-// Licensed under Apache License v2.0
-//
-// See LICENSE.txt for license information
-// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
-//
-// SPDX-License-Identifier: Apache-2.0
-//
-//===----------------------------------------------------------------------===//
-
-public struct KafkaProducerConfiguration {
-    // MARK: - SwiftKafka-specific Config properties
-
-    /// The time between two consecutive polls.
-    /// Effectively controls the rate at which incoming events are consumed.
-    /// Default: `.milliseconds(100)`
-    public var pollInterval: Duration = .milliseconds(100)
-
-    /// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
-    /// Default: `10000`
-    public var flushTimeoutMilliseconds: Int = 10000 {
-        didSet {
-            precondition(
-                0...Int(Int32.max) ~= self.flushTimeoutMilliseconds,
-                "Flush timeout outside of valid range \(0...Int32.max)"
-            )
-        }
-    }
-
-    // MARK: - Producer-specific Config Properties
-
-    /// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
-    /// Default: `false`
-    public var enableIdempotence: Bool = false
-
-    /// Producer queue options.
-    public var queue: KafkaConfiguration.QueueOptions = .init()
-
-    /// How many times to retry sending a failing Message. Note: retrying may cause reordering unless enable.idempotence is set to true.
-    /// Default: `2_147_483_647`
-    public var messageSendMaxRetries: Int = 2_147_483_647
-
-    /// Allow automatic topic creation on the broker when producing to non-existent topics.
-    /// The broker must also be configured with auto.create.topics.enable=true for this configuration to take effect.
-    /// Default: `true`
-    public var allowAutoCreateTopics: Bool = true
-
-    // MARK: - Common Client Config Properties
-
-    /// Client identifier.
-    /// Default: `"rdkafka"`
-    public var clientID: String = "rdkafka"
-
-    /// Initial list of brokers.
-    /// Default: `[]`
-    public var bootstrapServers: [KafkaConfiguration.Broker] = []
-
-    /// Message options.
-    public var message: KafkaConfiguration.MessageOptions = .init()
-
-    /// Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set.
-    /// Default: `100_000_000`
-    public var receiveMessageMaxBytes: Int = 100_000_000
-
-    /// Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.
-    /// Default: `1_000_000`
-    public var maxInFlightRequestsPerConnection: Int = 1_000_000
-
-    /// Metadata cache max age.
-    /// Default: `900_000`
-    public var metadataMaxAgeMilliseconds: Int = 900_000
-
-    /// Topic metadata options.
-    public var topicMetadata: KafkaConfiguration.TopicMetadataOptions = .init()
-
-    /// Topic denylist.
-    /// Default: `[]`
-    public var topicDenylist: [String] = []
-
-    /// Debug options.
-    /// Default: `[]`
-    public var debug: [KafkaConfiguration.DebugOption] = []
-
-    /// Socket options.
-    public var socket: KafkaConfiguration.SocketOptions = .init()
-
-    /// Broker options.
-    public var broker: KafkaConfiguration.BrokerOptions = .init()
-
-    /// Reconnect options.
-    public var reconnect: KafkaConfiguration.ReconnectOptions = .init()
-
-    /// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
-    /// Default: `.plaintext`
-    public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext
-
-    public init() {}
-}
-
-// MARK: - KafkaProducerConfiguration + Dictionary
-
-extension KafkaProducerConfiguration {
-    internal var dictionary: [String: String] {
-        sharedPropsDictionary
-    }
-}
-
-// MARK: - KafkaProducerConfiguration + Hashable
-
-extension KafkaProducerConfiguration: Hashable {}
-
-// MARK: - KafkaProducerConfiguration + Sendable
-
-extension KafkaProducerConfiguration: Sendable {}
-
-extension KafkaProducerConfiguration: KafkaProducerSharedProperties {}
-
-// MARK: - KafkaConfiguration + Producer Additions
-
-extension KafkaConfiguration {
-    /// Producer queue options.
-    public struct QueueOptions: Sendable, Hashable {
-        /// Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. A value of 0 disables this limit.
-        /// Default: `100_000`
-        public var bufferingMaxMessages: Int = 100_000
-
-        /// Maximum total message size sum allowed on the producer queue. This queue is shared by all topics and partitions. This property has higher priority than queue.buffering.max.messages.
-        /// Default: `1_048_576`
-        public var bufferingMaxKBytes: Int = 1_048_576
-
-        /// Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency.
-        /// Default: `5`
-        public var bufferingMaxMilliseconds: Int = 5
-
-        public init(
-            bufferingMaxMessages: Int = 100_000,
-            bufferingMaxKBytes: Int = 1_048_576,
-            bufferingMaxMilliseconds: Int = 5
-        ) {
-            self.bufferingMaxMessages = bufferingMaxMessages
-            self.bufferingMaxKBytes = bufferingMaxKBytes
-            self.bufferingMaxMilliseconds = bufferingMaxMilliseconds
-        }
-    }
-}