diff --git a/client/src/main/scala/cakesolutions/kafka/KafkaProducerRecord.scala b/client/src/main/scala/cakesolutions/kafka/KafkaProducerRecord.scala index 4c0f419..ba017be 100644 --- a/client/src/main/scala/cakesolutions/kafka/KafkaProducerRecord.scala +++ b/client/src/main/scala/cakesolutions/kafka/KafkaProducerRecord.scala @@ -3,6 +3,8 @@ package cakesolutions.kafka import cakesolutions.kafka.KafkaTopicPartition.{Partition, Topic} import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.Header +import scala.collection.JavaConverters._ /** * Helper functions for creating Kafka's `ProducerRecord`s. @@ -59,6 +61,24 @@ object KafkaProducerRecord { case None => new ProducerRecord(topic, value) } + /** + * Create a producer record with an optional key. + * + * @param topic the topic where the record will be appended to + * @param key optional key that will be included in the record + * @param value the value that will be included in the record + * @param headers the record headers + * @tparam Key type of the key + * @tparam Value type of the value + * @return producer record + */ + def apply[Key >: Null, Value](topic: String, key: Option[Key], value: Value, + headers: Seq[Header]): ProducerRecord[Key, Value] = + key match { + case Some(k) => apply(topic, k, value, headers) + case None => apply(topic, value, headers) + } + /** * Create a producer record with topic, key, and value. * @@ -72,6 +92,21 @@ object KafkaProducerRecord { def apply[Key, Value](topic: String, key: Key, value: Value): ProducerRecord[Key, Value] = new ProducerRecord(topic, key, value) + /** + * Create a producer record with topic, key, and value. + * + * @param topic the topic where the record will be appended to + * @param key the key that will be included in the record + * @param value the value that will be included in the record + * @param headers the record headers + * @tparam Key type of the key + * @tparam Value type of the value + * @return producer record + */ + def apply[Key, Value](topic: String, key: Key, value: Value, + headers: Seq[Header]): ProducerRecord[Key, Value] = + new ProducerRecord(topic, null, key, value, headers.asJava) + /** * Create a producer record without a key. * @@ -84,6 +119,20 @@ object KafkaProducerRecord { def apply[Key, Value](topic: String, value: Value): ProducerRecord[Key, Value] = new ProducerRecord(topic, value) + /** + * Create a producer record without a key. + * + * @param topic topic to which record is being sent + * @param value the value that will be included in the record + * @param headers the record headers + * @tparam Key type of the key + * @tparam Value type of the value + * @return producer record + */ + def apply[Key >: Null, Value](topic: String, value: Value, + headers: Seq[Header]): ProducerRecord[Key, Value] = + new ProducerRecord(topic, null, null, null, value, headers.asJava) + /** * Create a producer record from a topic selection, optional key, value, and optional timestamp. * @@ -100,13 +149,34 @@ object KafkaProducerRecord { key: Option[Key] = None, value: Value, timestamp: Option[Long] = None - ): ProducerRecord[Key, Value] = { + ): ProducerRecord[Key, Value] = + apply(topicPartitionSelection, key, value, timestamp, Seq.empty) + + /** + * Create a producer record from a topic selection, optional key, value, and optional timestamp. + * + * @param topicPartitionSelection the topic (with optional partition) where the record will be appended to + * @param key the key that will be included in the record + * @param value the value that will be included in the record + * @param timestamp the timestamp of the record + * @param headers the record headers + * @tparam Key type of the key + * @tparam Value type of the value + * @return producer record + */ + def apply[Key >: Null, Value]( + topicPartitionSelection: Destination, + key: Option[Key], + value: Value, + timestamp: Option[Long], + headers: Seq[Header] + ): ProducerRecord[Key, Value] = { val topic = topicPartitionSelection.topic val partition = topicPartitionSelection.partition.map(i => i: java.lang.Integer).orNull val nullableKey = key.orNull val nullableTimestamp = timestamp.map(i => i: java.lang.Long).orNull - new ProducerRecord[Key, Value](topic, partition, nullableTimestamp, nullableKey, value) + new ProducerRecord[Key, Value](topic, partition, nullableTimestamp, nullableKey, value, headers.asJava) } /** diff --git a/client/src/test/scala/cakesolutions/kafka/KafkaProducerRecordSpec.scala b/client/src/test/scala/cakesolutions/kafka/KafkaProducerRecordSpec.scala new file mode 100644 index 0000000..5131a78 --- /dev/null +++ b/client/src/test/scala/cakesolutions/kafka/KafkaProducerRecordSpec.scala @@ -0,0 +1,88 @@ +package cakesolutions.kafka + +import java.nio.ByteBuffer +import java.util.UUID + +import cakesolutions.kafka.KafkaProducerRecord.Destination +import org.apache.kafka.common.header.internals.RecordHeader +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers + +import scala.collection.JavaConverters._ + +class KafkaProducerRecordSpec extends AnyFlatSpecLike with Matchers { + private val headers = Seq( + new RecordHeader("foo", "kafka".getBytes), + new RecordHeader("bar", ByteBuffer.wrap("apache".getBytes)) + ) + type K = String + type V = UUID + private val topic = "scala-kafka-client" + private val k = "west" + private val v = UUID.randomUUID() + private val dest = Destination(topic = topic) + + "apply[Key >: Null, Value](topic: String, key: Option[Key], value: Value)" should "create the specified record" in { + val rec = KafkaProducerRecord(topic, Some(k), v) + rec.topic() shouldBe topic + rec.key() shouldBe k + rec.value() shouldBe v + rec.headers().asScala.toSeq shouldBe Seq.empty + } + + "apply[Key >: Null, Value](topic: String, key: Option[Key], value: Value, headers: Seq[Header])" should "create the specified record" in { + val rec = KafkaProducerRecord(topic, Some(k), v, headers) + rec.topic() shouldBe topic + rec.key() shouldBe k + rec.value() shouldBe v + rec.headers().asScala.toSeq shouldBe headers + } + + "apply[Key, Value](topic: String, key: Key, value: Value)" should "create the specified record" in { + val rec = KafkaProducerRecord(topic, k, v) + rec.topic() shouldBe topic + rec.key() shouldBe k + rec.value() shouldBe v + rec.headers().asScala.toSeq shouldBe Seq.empty + } + + "apply[Key, Value](topic: String, key: Key, value: Value, headers: Seq[Header])" should "create the specified record" in { + val rec = KafkaProducerRecord(topic, k, v, headers) + rec.topic() shouldBe topic + rec.key() shouldBe k + rec.value() shouldBe v + rec.headers().asScala.toSeq shouldBe headers + } + + "apply[Key >: Null, Value](topic: String, value: Value)" should "create the specified record" in { + val rec = KafkaProducerRecord[K, V](topic, v) + rec.topic() shouldBe topic + rec.key() shouldBe null + rec.value() shouldBe v + rec.headers().asScala.toSeq shouldBe Seq.empty + } + + "apply[Key >: Null, Value](topic: String, value: Value, headers: Seq[Header])" should "create the specified record" in { + val rec = KafkaProducerRecord[K, V](topic, v, headers) + rec.topic() shouldBe topic + rec.key() shouldBe null + rec.value() shouldBe v + rec.headers().asScala.toSeq shouldBe headers + } + + "apply[Key >: Null, Value](topicPartitionSelection: Destination, key: Option[Key] = None, value: Value, timestamp: Option[Long] = None)" should "create the specified record" in { + val rec = KafkaProducerRecord(dest, Some(k), v) + rec.topic() shouldBe topic + rec.key() shouldBe k + rec.value() shouldBe v + rec.headers().asScala.toSeq shouldBe Seq.empty + } + + "apply[Key >: Null, Value](topicPartitionSelection: Destination, key: Option[Key] = None, value: Value, timestamp: Option[Long] = None, headers: Seq[Header])" should "create the specified record" in { + val rec = KafkaProducerRecord(dest, Some(k), v, timestamp = None, headers) + rec.topic() shouldBe topic + rec.key() shouldBe k + rec.value() shouldBe v + rec.headers().asScala.toSeq shouldBe headers + } +}