Skip to content

Commit 364ace7

Browse files
authored
Fix error handling and partition config bugs in KafkaMessagingProvider.ensureTopic (#5527)
1 parent 8f8a4aa commit 364ace7

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

Diff for: common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala

+6-3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ case class KafkaConfig(replicationFactor: Short, consumerLagCheckInterval: Finit
4141
object KafkaMessagingProvider extends MessagingProvider {
4242
import KafkaConfiguration._
4343

44+
private val topicPartitionsConfigKey = "partitions"
45+
4446
def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
4547
implicit logging: Logging,
4648
actorSystem: ActorSystem): MessageConsumer =
@@ -64,12 +66,13 @@ object KafkaMessagingProvider extends MessagingProvider {
6466

6567
Try(AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)))
6668
.flatMap(client => {
67-
val partitions = topicConfig.getOrElse("partitions", "1").toInt
68-
val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava)
69+
val partitions = topicConfig.getOrElse(topicPartitionsConfigKey, "1").toInt
70+
val safeTopicConfig = topicConfig - topicPartitionsConfigKey
71+
val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(safeTopicConfig.asJava)
6972

7073
def createTopic(retries: Int = 5): Try[Unit] = {
7174
Try(client.listTopics().names().get())
72-
.map(topics =>
75+
.flatMap(topics =>
7376
if (topics.contains(topic)) {
7477
Success(logging.info(this, s"$topic already exists and the user can see it, skipping creation."))
7578
} else {

0 commit comments

Comments
 (0)