diff --git a/.travis.yml b/.travis.yml index bc83a10..c31dcae 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: scala jdk: - - oraclejdk8 + - openjdk8 scala: - 2.11.12 diff --git a/build.sbt b/build.sbt index c1ef360..4fe4c23 100644 --- a/build.sbt +++ b/build.sbt @@ -2,8 +2,8 @@ import sbtrelease.Version parallelExecution in ThisBuild := false -val kafkaVersion = "2.0.0" -val confluentVersion = "5.0.0" +val kafkaVersion = "2.3.1" +val confluentVersion = "5.3.1" val akkaVersion = "2.5.14" lazy val commonSettings = Seq( diff --git a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/avro/avroMarshallers.scala b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/avro/avroMarshallers.scala index 8453257..49dc20c 100644 --- a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/avro/avroMarshallers.scala +++ b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/avro/avroMarshallers.scala @@ -13,9 +13,7 @@ import org.apache.avro.specific.{ import org.apache.kafka.common.serialization.{Deserializer, Serializer} class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema) - extends Deserializer[T] - with NoOpConfiguration - with NoOpClose { + extends Deserializer[T] { private val reader = new SpecificDatumReader[T](schema) @@ -25,10 +23,7 @@ class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema) } } -class KafkaAvroSerializer[T <: SpecificRecord]() - extends Serializer[T] - with NoOpConfiguration - with NoOpClose { +class KafkaAvroSerializer[T <: SpecificRecord]() extends Serializer[T] { private def toBytes(nullableData: T): Array[Byte] = Option(nullableData).fold[Array[Byte]](null) { data => @@ -46,11 +41,3 @@ class KafkaAvroSerializer[T <: SpecificRecord]() override def serialize(topic: String, data: T): Array[Byte] = toBytes(data) } - -sealed trait NoOpConfiguration { - def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = () -} - -sealed trait NoOpClose { - def close(): Unit = () -} diff --git a/kafka-streams/src/main/scala/net/manub/embeddedkafka/streams/EmbeddedKafkaStreams.scala b/kafka-streams/src/main/scala/net/manub/embeddedkafka/streams/EmbeddedKafkaStreams.scala index 47663c8..da236ed 100644 --- a/kafka-streams/src/main/scala/net/manub/embeddedkafka/streams/EmbeddedKafkaStreams.scala +++ b/kafka-streams/src/main/scala/net/manub/embeddedkafka/streams/EmbeddedKafkaStreams.scala @@ -30,7 +30,7 @@ trait EmbeddedKafkaStreams extends EmbeddedKafka with TestStreamsConfig { topicsToCreate.foreach(topic => createCustomTopic(topic)) val streamId = UUIDs.newUuid().toString val streams = - new KafkaStreams(topology, streamConfig(streamId, extraConfig)) + new KafkaStreams(topology, streamProps(streamId, extraConfig)) streams.start() try { block diff --git a/kafka-streams/src/main/scala/net/manub/embeddedkafka/streams/TestStreamsConfig.scala b/kafka-streams/src/main/scala/net/manub/embeddedkafka/streams/TestStreamsConfig.scala index d3e6f58..b847dcf 100644 --- a/kafka-streams/src/main/scala/net/manub/embeddedkafka/streams/TestStreamsConfig.scala +++ b/kafka-streams/src/main/scala/net/manub/embeddedkafka/streams/TestStreamsConfig.scala @@ -1,6 +1,7 @@ package net.manub.embeddedkafka.streams import java.nio.file.Files +import java.util.Properties import net.manub.embeddedkafka.EmbeddedKafkaConfig import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetResetStrategy} @@ -17,9 +18,9 @@ trait TestStreamsConfig { * @param kafkaConfig the Kafka test configuration * @return the Streams configuration */ - def streamConfig(streamName: String, - extraConfig: Map[String, AnyRef] = Map.empty)( - implicit kafkaConfig: EmbeddedKafkaConfig): StreamsConfig = { + def streamProps(streamName: String, + extraConfig: Map[String, AnyRef] = Map.empty)( + implicit kafkaConfig: EmbeddedKafkaConfig): Properties = { import scala.collection.JavaConverters._ val defaultConfig = Map( @@ -33,6 +34,9 @@ trait TestStreamsConfig { ) val configOverwrittenByExtra = defaultConfig ++ extraConfig - new StreamsConfig(configOverwrittenByExtra.asJava) + + val props = new Properties() + props.putAll(configOverwrittenByExtra.asJava) + props } } diff --git a/schema-registry/src/main/scala/net.manub.embeddedkafka/schemaregistry/streams/EmbeddedKafkaStreamsWithSchemaRegistry.scala b/schema-registry/src/main/scala/net.manub.embeddedkafka/schemaregistry/streams/EmbeddedKafkaStreamsWithSchemaRegistry.scala index b1da7dc..f54df42 100644 --- a/schema-registry/src/main/scala/net.manub.embeddedkafka/schemaregistry/streams/EmbeddedKafkaStreamsWithSchemaRegistry.scala +++ b/schema-registry/src/main/scala/net.manub.embeddedkafka/schemaregistry/streams/EmbeddedKafkaStreamsWithSchemaRegistry.scala @@ -39,11 +39,9 @@ trait EmbeddedKafkaStreamsWithSchemaRegistry withRunningKafka { topicsToCreate.foreach(topic => createCustomTopic(topic)) val streamId = UUIDs.newUuid().toString - val streams = - new KafkaStreams( - topology, - streamConfig(streamId, - extraConfig ++ consumerConfigForSchemaRegistry)) + val streams = new KafkaStreams( + topology, + streamProps(streamId, extraConfig ++ consumerConfigForSchemaRegistry)) streams.start() try { block diff --git a/version.sbt b/version.sbt index 4ce70f5..5783339 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "2.1.0-SNAPSHOT" +version in ThisBuild := "2.3.1-SNAPSHOT"