Skip to content

Commit

Permalink
Kafka Admin Client: creating once instead of many times
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 13, 2025
1 parent e7008c8 commit f3fac12
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
private lazy val confluentClient = schemaRegistryClientFactory.create(kafkaConfig)

test("all topic strategy test") {
val strategy = new TopicsWithExistingSubjectSelectionStrategy()
strategy.getTopics(confluentClient, kafkaConfig).toList.map(_.toSet) shouldBe List(
val strategy = new TopicsWithExistingSubjectSelectionStrategy(confluentClient)
strategy.getTopics.toList.map(_.toSet) shouldBe List(
Set(
RecordTopic,
RecordTopicWithKey,
Expand All @@ -40,8 +40,9 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
}

test("topic filtering strategy test") {
val strategy = new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile(".*Record.*"))
strategy.getTopics(confluentClient, kafkaConfig).toList shouldBe List(
val strategy =
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(confluentClient, Pattern.compile(".*Record.*"))
strategy.getTopics.toList shouldBe List(
List(ArrayOfRecordsTopic, RecordTopic, RecordTopicWithKey)
)
}
Expand All @@ -53,8 +54,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
testModelDependencies,
new FlinkKafkaSourceImplFactory(None)
) {
override def topicSelectionStrategy =
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile("test-.*"))
override lazy val topicSelectionStrategy =
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(confluentClient, Pattern.compile("test-.*"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
@transient protected lazy val schemaRegistryClient: SchemaRegistryClient =
schemaRegistryClientFactory.create(kafkaConfig)

protected def topicSelectionStrategy: TopicSelectionStrategy = {
@transient protected lazy val topicSelectionStrategy: TopicSelectionStrategy = {
if (kafkaConfig.showTopicsWithoutSchema) {
new AllNonHiddenTopicsSelectionStrategy
} else new TopicsWithExistingSubjectSelectionStrategy
new AllNonHiddenTopicsSelectionStrategy(schemaRegistryClient, kafkaConfig)
} else new TopicsWithExistingSubjectSelectionStrategy(schemaRegistryClient)
}

@transient protected lazy val kafkaConfig: KafkaConfig = prepareKafkaConfig
Expand All @@ -67,7 +67,7 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
protected def getTopicParam(
implicit nodeId: NodeId
): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = {
val topics = topicSelectionStrategy.getTopics(schemaRegistryClient, kafkaConfig)
val topics = topicSelectionStrategy.getTopics

(topics match {
case Valid(topics) => Writer[List[ProcessCompilationError], List[UnspecializedTopicName]](Nil, topics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,24 @@ import scala.jdk.CollectionConverters._

trait TopicSelectionStrategy extends Serializable {

def getTopics(
schemaRegistryClient: SchemaRegistryClient,
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]]
def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]]

}

class TopicsWithExistingSubjectSelectionStrategy extends TopicSelectionStrategy {
class TopicsWithExistingSubjectSelectionStrategy(schemaRegistryClient: SchemaRegistryClient)
extends TopicSelectionStrategy {

override def getTopics(
schemaRegistryClient: SchemaRegistryClient,
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
schemaRegistryClient.getAllTopics
}

}

class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy {
// FIXME: AdminClient
class AllNonHiddenTopicsSelectionStrategy(schemaRegistryClient: SchemaRegistryClient, kafkaConfig: KafkaConfig)
extends TopicSelectionStrategy {

override def getTopics(
schemaRegistryClient: SchemaRegistryClient,
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
val topicsFromSchemaRegistry = schemaRegistryClient.getAllTopics

val schemaLessTopics: List[UnspecializedTopicName] = {
Expand Down Expand Up @@ -68,13 +62,12 @@ class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy {

}

class TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(val topicPattern: Pattern)
extends TopicSelectionStrategy {
class TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(
schemaRegistryClient: SchemaRegistryClient,
topicPattern: Pattern
) extends TopicSelectionStrategy {

override def getTopics(
schemaRegistryClient: SchemaRegistryClient,
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] =
override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] =
schemaRegistryClient.getAllTopics.map(_.filter(topic => topicPattern.matcher(topic.name).matches()))

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ trait SchemaRegistryClient extends Serializable {

def getAllVersions(topic: UnspecializedTopicName, isKey: Boolean): Validated[SchemaRegistryError, List[Integer]]

// FIXME: strategy created once
def isTopicWithSchema(topic: String, kafkaConfig: KafkaConfig): Boolean = {
if (!kafkaConfig.showTopicsWithoutSchema) {
true
} else {
val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy().getTopics(this, kafkaConfig)
val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy(this).getTopics
topicsWithSchema.exists(_.map(_.name).contains(topic))
}
}
Expand Down

0 comments on commit f3fac12

Please sign in to comment.