From f3fac12b20e4c59fd4eae5cd62dce3bed3d4097e Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Mon, 13 Jan 2025 13:58:01 +0100 Subject: [PATCH] Kafka Admin Client: creating once instead of many times --- .../flink/TopicSelectionStrategySpec.scala | 13 ++++---- .../KafkaUniversalComponentTransformer.scala | 8 ++--- .../schemedkafka/TopicSelectionStrategy.scala | 33 ++++++++----------- .../schemaregistry/SchemaRegistryClient.scala | 3 +- 4 files changed, 26 insertions(+), 31 deletions(-) diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TopicSelectionStrategySpec.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TopicSelectionStrategySpec.scala index 14e34f6c576..8999a6ec2db 100644 --- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TopicSelectionStrategySpec.scala +++ b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TopicSelectionStrategySpec.scala @@ -24,8 +24,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource private lazy val confluentClient = schemaRegistryClientFactory.create(kafkaConfig) test("all topic strategy test") { - val strategy = new TopicsWithExistingSubjectSelectionStrategy() - strategy.getTopics(confluentClient, kafkaConfig).toList.map(_.toSet) shouldBe List( + val strategy = new TopicsWithExistingSubjectSelectionStrategy(confluentClient) + strategy.getTopics.toList.map(_.toSet) shouldBe List( Set( RecordTopic, RecordTopicWithKey, @@ -40,8 +40,9 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource } test("topic filtering strategy test") { - val strategy = new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile(".*Record.*")) - strategy.getTopics(confluentClient, kafkaConfig).toList shouldBe List( + val strategy = + new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(confluentClient, Pattern.compile(".*Record.*")) + strategy.getTopics.toList shouldBe List( List(ArrayOfRecordsTopic, RecordTopic, RecordTopicWithKey) ) } @@ -53,8 +54,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource testModelDependencies, new FlinkKafkaSourceImplFactory(None) ) { - override def topicSelectionStrategy = - new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile("test-.*")) + override lazy val topicSelectionStrategy = + new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(confluentClient, Pattern.compile("test-.*")) } } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala index 9da6f208136..b1ed8ada828 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala @@ -47,10 +47,10 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid @transient protected lazy val schemaRegistryClient: SchemaRegistryClient = schemaRegistryClientFactory.create(kafkaConfig) - protected def topicSelectionStrategy: TopicSelectionStrategy = { + @transient protected lazy val topicSelectionStrategy: TopicSelectionStrategy = { if (kafkaConfig.showTopicsWithoutSchema) { - new AllNonHiddenTopicsSelectionStrategy - } else new TopicsWithExistingSubjectSelectionStrategy + new AllNonHiddenTopicsSelectionStrategy(schemaRegistryClient, kafkaConfig) + } else new TopicsWithExistingSubjectSelectionStrategy(schemaRegistryClient) } @transient protected lazy val kafkaConfig: KafkaConfig = prepareKafkaConfig @@ -67,7 +67,7 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid protected def getTopicParam( implicit nodeId: NodeId ): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = { - val topics = topicSelectionStrategy.getTopics(schemaRegistryClient, kafkaConfig) + val topics = topicSelectionStrategy.getTopics (topics match { case Valid(topics) => Writer[List[ProcessCompilationError], List[UnspecializedTopicName]](Nil, topics) diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala index 1693ddea4fb..6c9a38c458a 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala @@ -13,30 +13,24 @@ import scala.jdk.CollectionConverters._ trait TopicSelectionStrategy extends Serializable { - def getTopics( - schemaRegistryClient: SchemaRegistryClient, - kafkaConfig: KafkaConfig - ): Validated[SchemaRegistryError, List[UnspecializedTopicName]] + def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] } -class TopicsWithExistingSubjectSelectionStrategy extends TopicSelectionStrategy { +class TopicsWithExistingSubjectSelectionStrategy(schemaRegistryClient: SchemaRegistryClient) + extends TopicSelectionStrategy { - override def getTopics( - schemaRegistryClient: SchemaRegistryClient, - kafkaConfig: KafkaConfig - ): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = { + override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = { schemaRegistryClient.getAllTopics } } -class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy { +// FIXME: AdminClient +class AllNonHiddenTopicsSelectionStrategy(schemaRegistryClient: SchemaRegistryClient, kafkaConfig: KafkaConfig) + extends TopicSelectionStrategy { - override def getTopics( - schemaRegistryClient: SchemaRegistryClient, - kafkaConfig: KafkaConfig - ): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = { + override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = { val topicsFromSchemaRegistry = schemaRegistryClient.getAllTopics val schemaLessTopics: List[UnspecializedTopicName] = { @@ -68,13 +62,12 @@ class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy { } -class TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(val topicPattern: Pattern) - extends TopicSelectionStrategy { +class TopicsMatchingPatternWithExistingSubjectsSelectionStrategy( + schemaRegistryClient: SchemaRegistryClient, + topicPattern: Pattern +) extends TopicSelectionStrategy { - override def getTopics( - schemaRegistryClient: SchemaRegistryClient, - kafkaConfig: KafkaConfig - ): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = + override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = schemaRegistryClient.getAllTopics.map(_.filter(topic => topicPattern.matcher(topic.name).matches())) } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala index 8319b0c021f..71b5cd905a3 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala @@ -40,11 +40,12 @@ trait SchemaRegistryClient extends Serializable { def getAllVersions(topic: UnspecializedTopicName, isKey: Boolean): Validated[SchemaRegistryError, List[Integer]] + // FIXME: strategy created once def isTopicWithSchema(topic: String, kafkaConfig: KafkaConfig): Boolean = { if (!kafkaConfig.showTopicsWithoutSchema) { true } else { - val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy().getTopics(this, kafkaConfig) + val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy(this).getTopics topicsWithSchema.exists(_.map(_.name).contains(topic)) } }