diff --git a/.travis.yml b/.travis.yml index d638b8b..c6c0532 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,7 @@ language: scala jdk: oraclejdk8 scala: - 2.11.12 - - 2.12.6 + - 2.12.12 script: - sbt ++$TRAVIS_SCALA_VERSION -Dakka.test.timefactor=5 clean coverage test coverageReport @@ -21,4 +21,4 @@ cache: before_cache: # Cleanup the cached directories to avoid unnecessary cache updates - find $HOME/.ivy2/cache -name "ivydata-*.properties" -print -delete - - find $HOME/.sbt -name "*.lock" -print -delete \ No newline at end of file + - find $HOME/.sbt -name "*.lock" -print -delete diff --git a/README.md b/README.md index 91bfff2..89be816 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Scala support for Apache Kafka's Java client library 0.9.0.x - 2.3.1 +# Scala support for Apache Kafka's Java client library 0.9.0.x - 2.6.0 [![Join the chat at https://gitter.im/cakesolutions/scala-kafka-client](https://badges.gitter.im/cakesolutions/scala-kafka-client.svg)](https://gitter.im/cakesolutions/scala-kafka-client?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![Build status](https://travis-ci.org/cakesolutions/scala-kafka-client.svg?branch=master)](https://travis-ci.org/cakesolutions/scala-kafka-client) @@ -40,7 +40,7 @@ For configuration and usage, see the Wiki: SBT library dependency: ```scala -libraryDependencies += "net.cakesolutions" %% "scala-kafka-client" % "2.3.1" +libraryDependencies += "net.cakesolutions" %% "scala-kafka-client" % "2.6.0" ``` ### Akka Integration @@ -54,7 +54,7 @@ For configuration and usage, see the Wiki: SBT library dependency: ```scala -libraryDependencies += "net.cakesolutions" %% "scala-kafka-client-akka" % "2.3.1" +libraryDependencies += "net.cakesolutions" %% "scala-kafka-client-akka" % "2.6.0" ``` ### TestKit @@ -68,7 +68,7 @@ For usage, see the Wiki: SBT library dependency: ```scala -libraryDependencies += "net.cakesolutions" %% "scala-kafka-client-testkit" % "2.3.1" % "test" +libraryDependencies += "net.cakesolutions" %% "scala-kafka-client-testkit" % "2.6.0" % "test" ``` ## Version Compatibility @@ -89,6 +89,7 @@ Here is the full table of binary compatibilities between Scala Kafka client and Scala Kafka client | Kafka Java Driver --------------------- | ----------------- + 2.6.0 | 2.6.0 2.3.1 | 2.3.1 2.1.0 | 2.1.0 2.0.0 | 2.0.0 @@ -104,6 +105,9 @@ Here is the full table of binary compatibilities between Scala Kafka client and ## Change log +### 2.6.0 - 10/2020 +* Update to Kafka 2.6.0 + ### 2.3.1 - 11/2019 * Update to Kafka 2.3.1 diff --git a/akka/build.sbt b/akka/build.sbt index 49974c4..93c6d6a 100644 --- a/akka/build.sbt +++ b/akka/build.sbt @@ -18,5 +18,5 @@ libraryDependencies ++= Seq( "org.scalatest" %% "scalatest" % versions.scalaTest % "test", "org.scalatestplus" %% "mockito-1-10" % "3.1.0.0" % "test", "org.mockito" % "mockito-core" % "3.2.11" % "test", - "ch.qos.logback" % "logback-classic" % "1.1.3" % "test" -) \ No newline at end of file + "ch.qos.logback" % "logback-classic" % versions.logback % "test" +) diff --git a/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala b/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala index 5333ecc..ac3370b 100644 --- a/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala +++ b/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala @@ -1,7 +1,10 @@ package cakesolutions.kafka.akka +import java.lang + import java.time.LocalDateTime import java.time.temporal.ChronoUnit +import java.time.{Duration => JavaDuration} import akka.actor._ import cakesolutions.kafka.KafkaConsumer @@ -516,7 +519,7 @@ private final class KafkaConsumerActorImpl[K: TypeTag, V: TypeTag]( */ implicit def toJavaOffsetQuery(offsetQuery: Map[TopicPartition, scala.Long]): java.util.Map[TopicPartition, java.lang.Long] = offsetQuery - .map { case (tp, time) => tp -> new java.lang.Long(time) } + .map { case (tp, time) => tp -> lang.Long.valueOf(time) } .asJava type Records = ConsumerRecords[K, V] @@ -886,7 +889,7 @@ private final class KafkaConsumerActorImpl[K: TypeTag, V: TypeTag]( private def pollKafka(state: StateData, timeout: Int): Option[Records] = tryWithConsumer(state) { log.debug("Poll Kafka for {} milliseconds", timeout) - val rs = consumer.poll(timeout) + val rs = consumer.poll(JavaDuration.ofMillis(timeout)) log.debug("Poll Complete!") if (rs.count() > 0) Some(ConsumerRecords(currentConsumerOffsets, rs)) diff --git a/akka/src/test/scala/cakesolutions/kafka/akka/ConsumerRecordsSpec.scala b/akka/src/test/scala/cakesolutions/kafka/akka/ConsumerRecordsSpec.scala index e6b933c..ec041fc 100644 --- a/akka/src/test/scala/cakesolutions/kafka/akka/ConsumerRecordsSpec.scala +++ b/akka/src/test/scala/cakesolutions/kafka/akka/ConsumerRecordsSpec.scala @@ -1,9 +1,11 @@ package cakesolutions.kafka.akka import cakesolutions.kafka.KafkaTopicPartition -import org.scalatest.{FlatSpecLike, Inside, Matchers} +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.Inside +import org.scalatest.matchers.should.Matchers -class ConsumerRecordsSpec extends FlatSpecLike with Matchers with Inside { +class ConsumerRecordsSpec extends AnyFlatSpecLike with Matchers with Inside { val partition = KafkaTopicPartition("sometopic", 0) val knownInput: ConsumerRecords[String, Int] = ConsumerRecords.fromPairs(partition, Seq(Some("foo") -> 1)) diff --git a/akka/src/test/scala/cakesolutions/kafka/akka/KafkaIntSpec.scala b/akka/src/test/scala/cakesolutions/kafka/akka/KafkaIntSpec.scala index 2e859b2..12e878f 100644 --- a/akka/src/test/scala/cakesolutions/kafka/akka/KafkaIntSpec.scala +++ b/akka/src/test/scala/cakesolutions/kafka/akka/KafkaIntSpec.scala @@ -4,14 +4,16 @@ import akka.actor.ActorSystem import akka.testkit.TestKit import cakesolutions.kafka.testkit.KafkaServer import org.scalatest.concurrent.Waiters -import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.BeforeAndAfterAll +import org.scalatest.matchers.should.Matchers /** * ScalaTest base class for scala-kafka-client-testkit based integration tests */ class KafkaIntSpec(_system: ActorSystem) extends TestKit(_system) with Waiters - with FlatSpecLike + with AnyFlatSpecLike with Matchers with BeforeAndAfterAll { diff --git a/akka/src/test/scala/cakesolutions/kafka/akka/TrackPartionsSpec.scala b/akka/src/test/scala/cakesolutions/kafka/akka/TrackPartitionsSpec.scala similarity index 95% rename from akka/src/test/scala/cakesolutions/kafka/akka/TrackPartionsSpec.scala rename to akka/src/test/scala/cakesolutions/kafka/akka/TrackPartitionsSpec.scala index 8103a0c..e3d0d48 100644 --- a/akka/src/test/scala/cakesolutions/kafka/akka/TrackPartionsSpec.scala +++ b/akka/src/test/scala/cakesolutions/kafka/akka/TrackPartitionsSpec.scala @@ -13,7 +13,9 @@ import org.apache.kafka.common.serialization.StringDeserializer import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.BeforeAndAfterAll +import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar import scala.collection.JavaConverters._ @@ -21,8 +23,8 @@ import scala.concurrent.duration._ import scala.util.Random -class TrackPartionsSpec(system_ : ActorSystem) extends TestKit(system_) - with FlatSpecLike +class TrackPartitionsSpec(system_ : ActorSystem) extends TestKit(system_) + with AnyFlatSpecLike with Matchers with BeforeAndAfterAll with MockitoSugar { diff --git a/build.sbt b/build.sbt index 4cb26d5..70d137f 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ lazy val commonSettings = Seq( organization := "net.cakesolutions", - scalaVersion := "2.12.10", - crossScalaVersions := Seq("2.11.12", "2.12.10", "2.13.1"), + scalaVersion := "2.12.12", + crossScalaVersions := Seq("2.11.12", "2.12.12", "2.13.3"), publishMavenStyle := true, bintrayOrganization := Some("cakesolutions"), bintrayPackageLabels := Seq("scala", "kafka"), @@ -82,4 +82,4 @@ lazy val root = project.in(file(".")) .settings(commonSettings: _*) .enablePlugins(ScalaUnidocPlugin) .settings(name := "scala-kafka-client-root", publishArtifact := false, publish := {}, publishLocal := {}) - .aggregate(scalaKafkaClient, scalaKafkaClientAkka, kafkaTestkit) \ No newline at end of file + .aggregate(scalaKafkaClient, scalaKafkaClientAkka, kafkaTestkit) diff --git a/client/build.sbt b/client/build.sbt index 0af401f..866138e 100644 --- a/client/build.sbt +++ b/client/build.sbt @@ -13,5 +13,5 @@ libraryDependencies ++= Seq( //Test deps "org.slf4j" % "log4j-over-slf4j" % versions.slf4j % "test", "org.scalatest" %% "scalatest" % versions.scalaTest % "test", - "ch.qos.logback" % "logback-classic" % "1.1.3" % "test" -) \ No newline at end of file + "ch.qos.logback" % "logback-classic" % versions.logback % "test" +) diff --git a/client/src/main/scala/cakesolutions/kafka/KafkaConsumer.scala b/client/src/main/scala/cakesolutions/kafka/KafkaConsumer.scala index c58525c..5197fa3 100644 --- a/client/src/main/scala/cakesolutions/kafka/KafkaConsumer.scala +++ b/client/src/main/scala/cakesolutions/kafka/KafkaConsumer.scala @@ -1,10 +1,11 @@ package cakesolutions.kafka +import java.lang + import cakesolutions.kafka.TypesafeConfigExtensions._ -import com.typesafe.config.Config +import com.typesafe.config.{Config, ConfigFactory} import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetResetStrategy, KafkaConsumer => JKafkaConsumer} -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.requests.IsolationLevel +import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.apache.kafka.common.serialization.Deserializer import scala.collection.JavaConverters._ @@ -23,7 +24,7 @@ object KafkaConsumer { */ implicit def toJavaOffsetQuery(offsetQuery: Map[TopicPartition, scala.Long]): java.util.Map[TopicPartition, java.lang.Long] = offsetQuery - .map { case (tp, time) => tp -> new java.lang.Long(time) } + .map { case (tp, time) => tp -> lang.Long.valueOf(time) } .asJava /** @@ -66,7 +67,7 @@ object KafkaConsumer { isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED ): Conf[K, V] = { - val configMap = Map[String, AnyRef]( + val configMap = ConfigFactory.parseMap(Map( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> enableAutoCommit.toString, @@ -77,7 +78,7 @@ object KafkaConsumer { ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG -> maxPollInterval.toString, ConsumerConfig.METADATA_MAX_AGE_CONFIG ->maxMetaDataAge.toString, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> autoOffsetReset.toString.toLowerCase, - ConsumerConfig.ISOLATION_LEVEL_CONFIG -> isolationLevel.toString.toLowerCase() + ConsumerConfig.ISOLATION_LEVEL_CONFIG -> isolationLevel.toString.toLowerCase()).asJava ) apply(configMap, keyDeserializer, valueDeserializer) diff --git a/client/src/main/scala/cakesolutions/kafka/KafkaProducer.scala b/client/src/main/scala/cakesolutions/kafka/KafkaProducer.scala index eba7bea..71640f1 100644 --- a/client/src/main/scala/cakesolutions/kafka/KafkaProducer.scala +++ b/client/src/main/scala/cakesolutions/kafka/KafkaProducer.scala @@ -1,6 +1,6 @@ package cakesolutions.kafka -import java.util.concurrent.TimeUnit +import java.time.Duration import cakesolutions.kafka.TypesafeConfigExtensions._ import com.typesafe.config.Config @@ -155,7 +155,7 @@ object KafkaProducer { } transactionalId.foreach(tid => - configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tid.toString) + configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tid) ) apply(configMap.toMap, keySerializer, valueSerializer) @@ -283,7 +283,7 @@ final class KafkaProducer[K, V](val producer: JProducer[K, V]) extends KafkaProd producer.close() override def close(timeout: FiniteDuration): Unit = - producer.close(timeout.toMillis, TimeUnit.MILLISECONDS) + producer.close(Duration.ofNanos(timeout.toNanos)) private def producerCallback(promise: Promise[RecordMetadata]): Callback = producerCallback(result => promise.complete(result)) @@ -297,4 +297,4 @@ final class KafkaProducer[K, V](val producer: JProducer[K, V]) extends KafkaProd callback(result) } } -} \ No newline at end of file +} diff --git a/client/src/test/scala/cakesolutions/kafka/ConsumerProducerIntSpec.scala b/client/src/test/scala/cakesolutions/kafka/ConsumerProducerIntSpec.scala index 47bc224..9d074d8 100644 --- a/client/src/test/scala/cakesolutions/kafka/ConsumerProducerIntSpec.scala +++ b/client/src/test/scala/cakesolutions/kafka/ConsumerProducerIntSpec.scala @@ -1,5 +1,7 @@ package cakesolutions.kafka +import java.time.Duration + import com.typesafe.config.ConfigFactory import org.apache.kafka.clients.consumer.{ConsumerRecords, OffsetResetStrategy} import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} @@ -11,6 +13,7 @@ import scala.util.Random class ConsumerProducerIntSpec extends KafkaIntSpec { private val log = LoggerFactory.getLogger(getClass) + private val timeout = Duration.ofMillis(1000) private def randomString: String = Random.alphanumeric.take(5).mkString("") @@ -61,14 +64,14 @@ class ConsumerProducerIntSpec extends KafkaIntSpec { val consumer = KafkaConsumer(consumerFromDirectConfig) consumer.subscribe(List(topic).asJava) - val records1 = consumer.poll(1000) + val records1 = consumer.poll(timeout) records1.count() shouldEqual 0 log.info("Kafka producer connecting on port: [{}]", kafkaPort) producer.send(KafkaProducerRecord(topic, Some("key"), "value")) producer.flush() - val records2: ConsumerRecords[String, String] = consumer.poll(1000) + val records2: ConsumerRecords[String, String] = consumer.poll(timeout) records2.count() shouldEqual 1 producer.close() @@ -88,7 +91,7 @@ class ConsumerProducerIntSpec extends KafkaIntSpec { consumer.subscribe(List(topic).asJava) - val records2: ConsumerRecords[String, String] = consumer.poll(5000) + val records2: ConsumerRecords[String, String] = consumer.poll(timeout.multipliedBy(5)) records2.count() shouldEqual 1 producer.close() @@ -108,7 +111,7 @@ class ConsumerProducerIntSpec extends KafkaIntSpec { consumer.subscribe(List(topic).asJava) val count = (1 to 30).map { _ => - consumer.poll(1000).count + consumer.poll(timeout).count }.sum consumer.close() count diff --git a/client/src/test/scala/cakesolutions/kafka/IdempotentProducerSpec.scala b/client/src/test/scala/cakesolutions/kafka/IdempotentProducerSpec.scala index f2ce317..60de6c3 100644 --- a/client/src/test/scala/cakesolutions/kafka/IdempotentProducerSpec.scala +++ b/client/src/test/scala/cakesolutions/kafka/IdempotentProducerSpec.scala @@ -2,7 +2,7 @@ package cakesolutions.kafka import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.common.KafkaException -import org.apache.kafka.common.requests.IsolationLevel +import org.apache.kafka.common.IsolationLevel import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.slf4j.LoggerFactory diff --git a/client/src/test/scala/cakesolutions/kafka/KafkaConsumerSpec.scala b/client/src/test/scala/cakesolutions/kafka/KafkaConsumerSpec.scala index 995f73b..671f9cb 100644 --- a/client/src/test/scala/cakesolutions/kafka/KafkaConsumerSpec.scala +++ b/client/src/test/scala/cakesolutions/kafka/KafkaConsumerSpec.scala @@ -1,5 +1,7 @@ package cakesolutions.kafka +import java.time.Duration + import org.apache.kafka.clients.consumer.ConsumerRecords import org.scalatest.concurrent.Waiters.Waiter @@ -40,14 +42,15 @@ class KafkaConsumerSpec extends KafkaIntSpec { val consumer = KafkaConsumer(consumerConfig) consumer.subscribe(List(topic).asJava) - val records1 = consumer.poll(1000) + val timeout = Duration.ofMillis(1000) + val records1 = consumer.poll(timeout) records1.count() shouldEqual 0 log.info("Kafka producer connecting on port: [{}]", kafkaPort) producer.send(KafkaProducerRecord(topic, Some("key"), "value")) producer.flush() - val records2: ConsumerRecords[String, String] = consumer.poll(1000) + val records2: ConsumerRecords[String, String] = consumer.poll(timeout) records2.count() shouldEqual 1 producer.close() @@ -59,7 +62,7 @@ class KafkaConsumerSpec extends KafkaIntSpec { val topic = randomString log.info(s"Using topic [$topic] and kafka port [$kafkaPort]") - val badSerializer = (msg: String) => { + val badSerializer = (_: String) => { throw new Exception("Serialization failed") } diff --git a/client/src/test/scala/cakesolutions/kafka/KafkaIntSpec.scala b/client/src/test/scala/cakesolutions/kafka/KafkaIntSpec.scala index 33e2d0b..0371351 100644 --- a/client/src/test/scala/cakesolutions/kafka/KafkaIntSpec.scala +++ b/client/src/test/scala/cakesolutions/kafka/KafkaIntSpec.scala @@ -1,9 +1,11 @@ package cakesolutions.kafka import cakesolutions.kafka.testkit.KafkaServer -import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.BeforeAndAfterAll +import org.scalatest.matchers.should.Matchers -class KafkaIntSpec extends FlatSpecLike with Matchers with BeforeAndAfterAll { +class KafkaIntSpec extends AnyFlatSpecLike with Matchers with BeforeAndAfterAll { val kafkaServer = new KafkaServer() val kafkaPort = kafkaServer.kafkaPort diff --git a/examples/build.sbt b/examples/build.sbt index 931f4ea..07979cd 100644 --- a/examples/build.sbt +++ b/examples/build.sbt @@ -11,4 +11,4 @@ libraryDependencies ++= Seq( "ch.qos.logback" % "logback-classic" % versions.logback, "org.slf4j" % "slf4j-api" % versions.slf4j, "org.slf4j" % "log4j-over-slf4j" % versions.slf4j -) \ No newline at end of file +) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 58d7607..85c3c3b 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,10 +1,10 @@ object Dependencies { object versions { - val slf4j = "1.7.29" + val slf4j = "1.7.30" val logback = "1.2.3" - val scalaTest = "3.1.0" - val akka = "2.5.26" - val kafka = "2.4.0" + val scalaTest = "3.2.2" + val akka = "2.5.32" + val kafka = "2.6.0" val typesafeConfig = "1.4.0" } } diff --git a/project/build.properties b/project/build.properties index e7d6cbf..8b798b1 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 1.3.5 +sbt.version = 1.3.1 diff --git a/testkit/build.sbt b/testkit/build.sbt index e9e10f6..4b64216 100644 --- a/testkit/build.sbt +++ b/testkit/build.sbt @@ -12,6 +12,6 @@ libraryDependencies ++= Seq( exclude("org.slf4j", "slf4j-log4j12"), //Test deps - "org.apache.curator" % "curator-test" % "2.7.0", //3.0.0 + "org.apache.curator" % "curator-test" % "5.1.0", "ch.qos.logback" % "logback-classic" % versions.logback % "test" -) \ No newline at end of file +) diff --git a/testkit/src/main/scala/cakesolutions.kafka/testkit/KafkaServer.scala b/testkit/src/main/scala/cakesolutions.kafka/testkit/KafkaServer.scala index 93dbcc5..2f6607b 100644 --- a/testkit/src/main/scala/cakesolutions.kafka/testkit/KafkaServer.scala +++ b/testkit/src/main/scala/cakesolutions.kafka/testkit/KafkaServer.scala @@ -2,6 +2,7 @@ package cakesolutions.kafka.testkit import java.io.File import java.net.ServerSocket +import java.time.Duration import kafka.server.{KafkaConfig, KafkaServerStartable} import org.apache.curator.test.TestingServer @@ -151,6 +152,7 @@ final class KafkaServer( ): Seq[(Option[Key], Value)] = { val extendedConfig: Map[String, Object] = consumerConfig + (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServerAddress) val consumer = new KafkaConsumer(extendedConfig.asJava, keyDeserializer, valueDeserializer) + val pollingTimeout = Duration.ofMillis(100) try { consumer.subscribe(List(topic).asJava) @@ -160,7 +162,7 @@ final class KafkaServer( val start = System.currentTimeMillis() while (total < expectedNumOfRecords && System.currentTimeMillis() < start + timeout) { - val records = consumer.poll(100) + val records = consumer.poll(pollingTimeout) val kvs = records.asScala.map(r => (Option(r.key()), r.value())) collected ++= kvs total += records.count() diff --git a/version.sbt b/version.sbt index 341046b..9b46dae 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "2.4.0-SNAPSHOT" +version in ThisBuild := "2.6.0-SNAPSHOT"