diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaconsumerexample/config/KafkaConfiguration.java b/src/main/java/com/techprimers/kafka/springbootkafkaconsumerexample/config/KafkaConfiguration.java index 72eea29..88f4014 100644 --- a/src/main/java/com/techprimers/kafka/springbootkafkaconsumerexample/config/KafkaConfiguration.java +++ b/src/main/java/com/techprimers/kafka/springbootkafkaconsumerexample/config/KafkaConfiguration.java @@ -3,6 +3,7 @@ import com.techprimers.kafka.springbootkafkaconsumerexample.model.User; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; @@ -18,11 +19,21 @@ @Configuration public class KafkaConfiguration { + @Value("${kafka.bootstrapAddress}") + private String bootstrapAddress; + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } + @Bean public ConsumerFactory consumerFactory() { Map config = new HashMap<>(); - config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -30,20 +41,18 @@ public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(config); } - @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); - factory.setConsumerFactory(consumerFactory()); + public ConcurrentKafkaListenerContainerFactory userKafkaListenerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(userConsumerFactory()); return factory; } - @Bean public ConsumerFactory userConsumerFactory() { Map config = new HashMap<>(); - config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); @@ -51,11 +60,4 @@ public ConsumerFactory userConsumerFactory() { new JsonDeserializer<>(User.class)); } - @Bean - public ConcurrentKafkaListenerContainerFactory userKafkaListenerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(userConsumerFactory()); - return factory; - } - } diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaconsumerexample/listener/KafkaConsumer.java b/src/main/java/com/techprimers/kafka/springbootkafkaconsumerexample/listener/KafkaConsumer.java index b81cec7..48c5a15 100644 --- a/src/main/java/com/techprimers/kafka/springbootkafkaconsumerexample/listener/KafkaConsumer.java +++ b/src/main/java/com/techprimers/kafka/springbootkafkaconsumerexample/listener/KafkaConsumer.java @@ -13,7 +13,7 @@ public void consume(String message) { } - @KafkaListener(topics = "Kafka_Example_json", group = "group_json", + @KafkaListener(topics = "Kafka_Example_User", group = "group_json", containerFactory = "userKafkaListenerFactory") public void consumeJson(User user) { System.out.println("Consumed JSON Message: " + user); diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaconsumerexample/model/User.java b/src/main/java/com/techprimers/kafka/springbootkafkaconsumerexample/model/User.java index fbaaee0..ad249e5 100644 --- a/src/main/java/com/techprimers/kafka/springbootkafkaconsumerexample/model/User.java +++ b/src/main/java/com/techprimers/kafka/springbootkafkaconsumerexample/model/User.java @@ -25,7 +25,6 @@ public User() { } public User(String name, String dept) { - this.name = name; this.dept = dept; } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index bafddce..655e237 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1 +1,2 @@ -server.port=8081 \ No newline at end of file +server.port=8081 +kafka.bootstrapAddress=localhost:9092