Skip to content

update to Kafka 2.6.0 #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ language: scala
jdk: oraclejdk8
scala:
- 2.11.12
- 2.12.6
- 2.12.12

script:
- sbt ++$TRAVIS_SCALA_VERSION -Dakka.test.timefactor=5 clean coverage test coverageReport
Expand All @@ -21,4 +21,4 @@ cache:
before_cache:
# Cleanup the cached directories to avoid unnecessary cache updates
- find $HOME/.ivy2/cache -name "ivydata-*.properties" -print -delete
- find $HOME/.sbt -name "*.lock" -print -delete
- find $HOME/.sbt -name "*.lock" -print -delete
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Scala support for Apache Kafka's Java client library 0.9.0.x - 2.3.1
# Scala support for Apache Kafka's Java client library 0.9.0.x - 2.6.0

[![Join the chat at https://gitter.im/cakesolutions/scala-kafka-client](https://badges.gitter.im/cakesolutions/scala-kafka-client.svg)](https://gitter.im/cakesolutions/scala-kafka-client?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Build status](https://travis-ci.org/cakesolutions/scala-kafka-client.svg?branch=master)](https://travis-ci.org/cakesolutions/scala-kafka-client)
Expand Down Expand Up @@ -40,7 +40,7 @@ For configuration and usage, see the Wiki:
SBT library dependency:

```scala
libraryDependencies += "net.cakesolutions" %% "scala-kafka-client" % "2.3.1"
libraryDependencies += "net.cakesolutions" %% "scala-kafka-client" % "2.6.0"
```

### Akka Integration
Expand All @@ -54,7 +54,7 @@ For configuration and usage, see the Wiki:
SBT library dependency:

```scala
libraryDependencies += "net.cakesolutions" %% "scala-kafka-client-akka" % "2.3.1"
libraryDependencies += "net.cakesolutions" %% "scala-kafka-client-akka" % "2.6.0"
```

### TestKit
Expand All @@ -68,7 +68,7 @@ For usage, see the Wiki:
SBT library dependency:

```scala
libraryDependencies += "net.cakesolutions" %% "scala-kafka-client-testkit" % "2.3.1" % "test"
libraryDependencies += "net.cakesolutions" %% "scala-kafka-client-testkit" % "2.6.0" % "test"
```

## Version Compatibility
Expand All @@ -89,6 +89,7 @@ Here is the full table of binary compatibilities between Scala Kafka client and

Scala Kafka client | Kafka Java Driver
--------------------- | -----------------
2.6.0 | 2.6.0
2.3.1 | 2.3.1
2.1.0 | 2.1.0
2.0.0 | 2.0.0
Expand All @@ -104,6 +105,9 @@ Here is the full table of binary compatibilities between Scala Kafka client and

## Change log

### 2.6.0 - 10/2020
* Update to Kafka 2.6.0

### 2.3.1 - 11/2019
* Update to Kafka 2.3.1

Expand Down
4 changes: 2 additions & 2 deletions akka/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % versions.scalaTest % "test",
"org.scalatestplus" %% "mockito-1-10" % "3.1.0.0" % "test",
"org.mockito" % "mockito-core" % "3.2.11" % "test",
"ch.qos.logback" % "logback-classic" % "1.1.3" % "test"
)
"ch.qos.logback" % "logback-classic" % versions.logback % "test"
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package cakesolutions.kafka.akka

import java.lang

import java.time.LocalDateTime
import java.time.temporal.ChronoUnit
import java.time.{Duration => JavaDuration}

import akka.actor._
import cakesolutions.kafka.KafkaConsumer
Expand Down Expand Up @@ -516,7 +519,7 @@ private final class KafkaConsumerActorImpl[K: TypeTag, V: TypeTag](
*/
implicit def toJavaOffsetQuery(offsetQuery: Map[TopicPartition, scala.Long]): java.util.Map[TopicPartition, java.lang.Long] =
offsetQuery
.map { case (tp, time) => tp -> new java.lang.Long(time) }
.map { case (tp, time) => tp -> lang.Long.valueOf(time) }
.asJava

type Records = ConsumerRecords[K, V]
Expand Down Expand Up @@ -886,7 +889,7 @@ private final class KafkaConsumerActorImpl[K: TypeTag, V: TypeTag](
private def pollKafka(state: StateData, timeout: Int): Option[Records] =
tryWithConsumer(state) {
log.debug("Poll Kafka for {} milliseconds", timeout)
val rs = consumer.poll(timeout)
val rs = consumer.poll(JavaDuration.ofMillis(timeout))
log.debug("Poll Complete!")
if (rs.count() > 0)
Some(ConsumerRecords(currentConsumerOffsets, rs))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package cakesolutions.kafka.akka

import cakesolutions.kafka.KafkaTopicPartition
import org.scalatest.{FlatSpecLike, Inside, Matchers}
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.Inside
import org.scalatest.matchers.should.Matchers

class ConsumerRecordsSpec extends FlatSpecLike with Matchers with Inside {
class ConsumerRecordsSpec extends AnyFlatSpecLike with Matchers with Inside {

val partition = KafkaTopicPartition("sometopic", 0)
val knownInput: ConsumerRecords[String, Int] = ConsumerRecords.fromPairs(partition, Seq(Some("foo") -> 1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import akka.actor.ActorSystem
import akka.testkit.TestKit
import cakesolutions.kafka.testkit.KafkaServer
import org.scalatest.concurrent.Waiters
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers

/**
* ScalaTest base class for scala-kafka-client-testkit based integration tests
*/
class KafkaIntSpec(_system: ActorSystem) extends TestKit(_system)
with Waiters
with FlatSpecLike
with AnyFlatSpecLike
with Matchers
with BeforeAndAfterAll {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ import org.apache.kafka.common.serialization.StringDeserializer
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.util.Random


class TrackPartionsSpec(system_ : ActorSystem) extends TestKit(system_)
with FlatSpecLike
class TrackPartitionsSpec(system_ : ActorSystem) extends TestKit(system_)
with AnyFlatSpecLike
with Matchers
with BeforeAndAfterAll with MockitoSugar {

Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
lazy val commonSettings = Seq(
organization := "net.cakesolutions",
scalaVersion := "2.12.10",
crossScalaVersions := Seq("2.11.12", "2.12.10", "2.13.1"),
scalaVersion := "2.12.12",
crossScalaVersions := Seq("2.11.12", "2.12.12", "2.13.3"),
publishMavenStyle := true,
bintrayOrganization := Some("cakesolutions"),
bintrayPackageLabels := Seq("scala", "kafka"),
Expand Down Expand Up @@ -82,4 +82,4 @@ lazy val root = project.in(file("."))
.settings(commonSettings: _*)
.enablePlugins(ScalaUnidocPlugin)
.settings(name := "scala-kafka-client-root", publishArtifact := false, publish := {}, publishLocal := {})
.aggregate(scalaKafkaClient, scalaKafkaClientAkka, kafkaTestkit)
.aggregate(scalaKafkaClient, scalaKafkaClientAkka, kafkaTestkit)
4 changes: 2 additions & 2 deletions client/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ libraryDependencies ++= Seq(
//Test deps
"org.slf4j" % "log4j-over-slf4j" % versions.slf4j % "test",
"org.scalatest" %% "scalatest" % versions.scalaTest % "test",
"ch.qos.logback" % "logback-classic" % "1.1.3" % "test"
)
"ch.qos.logback" % "logback-classic" % versions.logback % "test"
)
13 changes: 7 additions & 6 deletions client/src/main/scala/cakesolutions/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package cakesolutions.kafka

import java.lang

import cakesolutions.kafka.TypesafeConfigExtensions._
import com.typesafe.config.Config
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetResetStrategy, KafkaConsumer => JKafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.IsolationLevel
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.serialization.Deserializer

import scala.collection.JavaConverters._
Expand All @@ -23,7 +24,7 @@ object KafkaConsumer {
*/
implicit def toJavaOffsetQuery(offsetQuery: Map[TopicPartition, scala.Long]): java.util.Map[TopicPartition, java.lang.Long] =
offsetQuery
.map { case (tp, time) => tp -> new java.lang.Long(time) }
.map { case (tp, time) => tp -> lang.Long.valueOf(time) }
.asJava

/**
Expand Down Expand Up @@ -66,7 +67,7 @@ object KafkaConsumer {
isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED
): Conf[K, V] = {

val configMap = Map[String, AnyRef](
val configMap = ConfigFactory.parseMap(Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> enableAutoCommit.toString,
Expand All @@ -77,7 +78,7 @@ object KafkaConsumer {
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG -> maxPollInterval.toString,
ConsumerConfig.METADATA_MAX_AGE_CONFIG ->maxMetaDataAge.toString,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> autoOffsetReset.toString.toLowerCase,
ConsumerConfig.ISOLATION_LEVEL_CONFIG -> isolationLevel.toString.toLowerCase()
ConsumerConfig.ISOLATION_LEVEL_CONFIG -> isolationLevel.toString.toLowerCase()).asJava
)

apply(configMap, keyDeserializer, valueDeserializer)
Expand Down
8 changes: 4 additions & 4 deletions client/src/main/scala/cakesolutions/kafka/KafkaProducer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cakesolutions.kafka

import java.util.concurrent.TimeUnit
import java.time.Duration

import cakesolutions.kafka.TypesafeConfigExtensions._
import com.typesafe.config.Config
Expand Down Expand Up @@ -155,7 +155,7 @@ object KafkaProducer {
}

transactionalId.foreach(tid =>
configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tid.toString)
configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tid)
)

apply(configMap.toMap, keySerializer, valueSerializer)
Expand Down Expand Up @@ -283,7 +283,7 @@ final class KafkaProducer[K, V](val producer: JProducer[K, V]) extends KafkaProd
producer.close()

override def close(timeout: FiniteDuration): Unit =
producer.close(timeout.toMillis, TimeUnit.MILLISECONDS)
producer.close(Duration.ofNanos(timeout.toNanos))

private def producerCallback(promise: Promise[RecordMetadata]): Callback =
producerCallback(result => promise.complete(result))
Expand All @@ -297,4 +297,4 @@ final class KafkaProducer[K, V](val producer: JProducer[K, V]) extends KafkaProd
callback(result)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package cakesolutions.kafka

import java.time.Duration

import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.{ConsumerRecords, OffsetResetStrategy}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
Expand All @@ -11,6 +13,7 @@ import scala.util.Random
class ConsumerProducerIntSpec extends KafkaIntSpec {

private val log = LoggerFactory.getLogger(getClass)
private val timeout = Duration.ofMillis(1000)

private def randomString: String = Random.alphanumeric.take(5).mkString("")

Expand Down Expand Up @@ -61,14 +64,14 @@ class ConsumerProducerIntSpec extends KafkaIntSpec {
val consumer = KafkaConsumer(consumerFromDirectConfig)
consumer.subscribe(List(topic).asJava)

val records1 = consumer.poll(1000)
val records1 = consumer.poll(timeout)
records1.count() shouldEqual 0

log.info("Kafka producer connecting on port: [{}]", kafkaPort)
producer.send(KafkaProducerRecord(topic, Some("key"), "value"))
producer.flush()

val records2: ConsumerRecords[String, String] = consumer.poll(1000)
val records2: ConsumerRecords[String, String] = consumer.poll(timeout)
records2.count() shouldEqual 1

producer.close()
Expand All @@ -88,7 +91,7 @@ class ConsumerProducerIntSpec extends KafkaIntSpec {

consumer.subscribe(List(topic).asJava)

val records2: ConsumerRecords[String, String] = consumer.poll(5000)
val records2: ConsumerRecords[String, String] = consumer.poll(timeout.multipliedBy(5))
records2.count() shouldEqual 1

producer.close()
Expand All @@ -108,7 +111,7 @@ class ConsumerProducerIntSpec extends KafkaIntSpec {
consumer.subscribe(List(topic).asJava)

val count = (1 to 30).map { _ =>
consumer.poll(1000).count
consumer.poll(timeout).count
}.sum
consumer.close()
count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cakesolutions.kafka

import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.requests.IsolationLevel
import org.apache.kafka.common.IsolationLevel
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.slf4j.LoggerFactory

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package cakesolutions.kafka

import java.time.Duration

import org.apache.kafka.clients.consumer.ConsumerRecords
import org.scalatest.concurrent.Waiters.Waiter

Expand Down Expand Up @@ -40,14 +42,15 @@ class KafkaConsumerSpec extends KafkaIntSpec {
val consumer = KafkaConsumer(consumerConfig)
consumer.subscribe(List(topic).asJava)

val records1 = consumer.poll(1000)
val timeout = Duration.ofMillis(1000)
val records1 = consumer.poll(timeout)
records1.count() shouldEqual 0

log.info("Kafka producer connecting on port: [{}]", kafkaPort)
producer.send(KafkaProducerRecord(topic, Some("key"), "value"))
producer.flush()

val records2: ConsumerRecords[String, String] = consumer.poll(1000)
val records2: ConsumerRecords[String, String] = consumer.poll(timeout)
records2.count() shouldEqual 1

producer.close()
Expand All @@ -59,7 +62,7 @@ class KafkaConsumerSpec extends KafkaIntSpec {
val topic = randomString
log.info(s"Using topic [$topic] and kafka port [$kafkaPort]")

val badSerializer = (msg: String) => {
val badSerializer = (_: String) => {
throw new Exception("Serialization failed")
}

Expand Down
6 changes: 4 additions & 2 deletions client/src/test/scala/cakesolutions/kafka/KafkaIntSpec.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package cakesolutions.kafka

import cakesolutions.kafka.testkit.KafkaServer
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers

class KafkaIntSpec extends FlatSpecLike with Matchers with BeforeAndAfterAll {
class KafkaIntSpec extends AnyFlatSpecLike with Matchers with BeforeAndAfterAll {
val kafkaServer = new KafkaServer()
val kafkaPort = kafkaServer.kafkaPort

Expand Down
2 changes: 1 addition & 1 deletion examples/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % versions.logback,
"org.slf4j" % "slf4j-api" % versions.slf4j,
"org.slf4j" % "log4j-over-slf4j" % versions.slf4j
)
)
8 changes: 4 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
object Dependencies {
object versions {
val slf4j = "1.7.29"
val slf4j = "1.7.30"
val logback = "1.2.3"
val scalaTest = "3.1.0"
val akka = "2.5.26"
val kafka = "2.4.0"
val scalaTest = "3.2.2"
val akka = "2.5.32"
val kafka = "2.6.0"
val typesafeConfig = "1.4.0"
}
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.3.5
sbt.version = 1.3.1
Loading