Skip to content

Kafka 2.3.1 #172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: scala

jdk:
- oraclejdk8
- openjdk8

scala:
- 2.11.12
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import sbtrelease.Version

parallelExecution in ThisBuild := false

val kafkaVersion = "2.0.0"
val confluentVersion = "5.0.0"
val kafkaVersion = "2.3.1"
val confluentVersion = "5.3.1"
val akkaVersion = "2.5.14"

lazy val commonSettings = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import org.apache.avro.specific.{
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema)
extends Deserializer[T]
with NoOpConfiguration
with NoOpClose {
extends Deserializer[T] {

private val reader = new SpecificDatumReader[T](schema)

Expand All @@ -25,10 +23,7 @@ class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema)
}
}

class KafkaAvroSerializer[T <: SpecificRecord]()
extends Serializer[T]
with NoOpConfiguration
with NoOpClose {
class KafkaAvroSerializer[T <: SpecificRecord]() extends Serializer[T] {

private def toBytes(nullableData: T): Array[Byte] =
Option(nullableData).fold[Array[Byte]](null) { data =>
Expand All @@ -46,11 +41,3 @@ class KafkaAvroSerializer[T <: SpecificRecord]()
override def serialize(topic: String, data: T): Array[Byte] =
toBytes(data)
}

sealed trait NoOpConfiguration {
def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
}

sealed trait NoOpClose {
def close(): Unit = ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ trait EmbeddedKafkaStreams extends EmbeddedKafka with TestStreamsConfig {
topicsToCreate.foreach(topic => createCustomTopic(topic))
val streamId = UUIDs.newUuid().toString
val streams =
new KafkaStreams(topology, streamConfig(streamId, extraConfig))
new KafkaStreams(topology, streamProps(streamId, extraConfig))
streams.start()
try {
block
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.manub.embeddedkafka.streams

import java.nio.file.Files
import java.util.Properties

import net.manub.embeddedkafka.EmbeddedKafkaConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetResetStrategy}
Expand All @@ -17,9 +18,9 @@ trait TestStreamsConfig {
* @param kafkaConfig the Kafka test configuration
* @return the Streams configuration
*/
def streamConfig(streamName: String,
extraConfig: Map[String, AnyRef] = Map.empty)(
implicit kafkaConfig: EmbeddedKafkaConfig): StreamsConfig = {
def streamProps(streamName: String,
extraConfig: Map[String, AnyRef] = Map.empty)(
implicit kafkaConfig: EmbeddedKafkaConfig): Properties = {
import scala.collection.JavaConverters._

val defaultConfig = Map(
Expand All @@ -33,6 +34,9 @@ trait TestStreamsConfig {
)
val configOverwrittenByExtra = defaultConfig ++
extraConfig
new StreamsConfig(configOverwrittenByExtra.asJava)

val props = new Properties()
props.putAll(configOverwrittenByExtra.asJava)
props
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@ trait EmbeddedKafkaStreamsWithSchemaRegistry
withRunningKafka {
topicsToCreate.foreach(topic => createCustomTopic(topic))
val streamId = UUIDs.newUuid().toString
val streams =
new KafkaStreams(
topology,
streamConfig(streamId,
extraConfig ++ consumerConfigForSchemaRegistry))
val streams = new KafkaStreams(
topology,
streamProps(streamId, extraConfig ++ consumerConfigForSchemaRegistry))
streams.start()
try {
block
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "2.1.0-SNAPSHOT"
version in ThisBuild := "2.3.1-SNAPSHOT"