diff --git a/be/src/main/java/movlit/be/common/config/RedisStreamConfig.java b/be/src/main/java/movlit/be/common/config/RedisStreamConfig.java index 5aa3f2ce..ad0ddba5 100644 --- a/be/src/main/java/movlit/be/common/config/RedisStreamConfig.java +++ b/be/src/main/java/movlit/be/common/config/RedisStreamConfig.java @@ -1,11 +1,15 @@ package movlit.be.common.config; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.RedisException; +import jakarta.annotation.PreDestroy; import java.time.Duration; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.stream.StreamMessageListenerContainer; @@ -17,9 +21,8 @@ @Slf4j public class RedisStreamConfig { - // TODO : AppConfig에서 정의한 threadPoolExecutor의 설정이 Redis Stream 처리에도 적합한지 확인.(스레드 풀 크키, 큐 크기) -// private final ThreadPoolExecutor threadPoolExecutor; -// private final RedisConnectionFactory redisConnectionFactory; + // 종료 상태 플래그 (shutdown 중인지 여부) + private volatile boolean shuttingDown = false; @Bean public StreamMessageListenerContainer> streamMessageListenerContainer( @@ -38,6 +41,17 @@ public StreamMessageListenerContainer> .executor(taskExecutor) .batchSize(10) .pollTimeout(Duration.ofSeconds(1)) + .errorHandler( + e -> { + // shutdown 중이고, 예외 메시지가 "Connection closed"인 경우만 무시 +// if (shuttingDown && e.getMessage() != null && e.getMessage() +// .contains("Redis exception")) { + if (e instanceof RedisSystemException || e instanceof RedisException) { + log.debug("Ignored connection closed exception during shutdown: {}", e.getMessage()); + } else { + log.error("Unexpected error in stream polling task", e); + } + }) .build(); StreamMessageListenerContainer> container = StreamMessageListenerContainer.create( @@ -47,4 +61,11 @@ public StreamMessageListenerContainer> return container; } + @PreDestroy + public void preDestroy() { + // 애플리케이션 컨텍스트 종료 전에 shutdown 플래그를 true로 설정 + shuttingDown = true; + log.info("RedisStreamConfig is shutting down. Setting shutdown flag to true."); + } + } diff --git a/be/src/main/java/movlit/be/pub_sub/chatMessage/application/service/ChatMessageConsumer.java b/be/src/main/java/movlit/be/pub_sub/chatMessage/application/service/ChatMessageConsumer.java index b07eded9..1358de58 100644 --- a/be/src/main/java/movlit/be/pub_sub/chatMessage/application/service/ChatMessageConsumer.java +++ b/be/src/main/java/movlit/be/pub_sub/chatMessage/application/service/ChatMessageConsumer.java @@ -1,6 +1,7 @@ package movlit.be.pub_sub.chatMessage.application.service; import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.stream.Consumer; @@ -25,13 +26,16 @@ public class ChatMessageConsumer { private static final String CONSUMER_GROUP = "chat_message_group"; // Consumer 그룹 이름 private static final String CONSUMER_NAME = "chat_message_consumer"; // Consumer 이름 + // 구독(Subscription)을 필드에 보관하여 종료 시점에 취소할 수 있게 함 + private Subscription subscription; + @PostConstruct public void init() { log.info("==== ChatMessageConsumer init()"); createConsumerGroup(); // Listener Container 시작 - Subscription subscription = streamMessageListenerContainer.receive( + subscription = streamMessageListenerContainer.receive( Consumer.from(CONSUMER_GROUP, CONSUMER_NAME), StreamOffset.create(MESSAGE_QUEUE, ReadOffset.lastConsumed()), chatMessageStreamListener @@ -48,4 +52,17 @@ private void createConsumerGroup() { } } + // 애플리케이션 종료 시 호출되어 리스너와 컨테이너를 종료 + @PreDestroy + public void shutdown() { + if (subscription != null) { + subscription.cancel(); + log.info("Subscription cancelled."); + } + if (streamMessageListenerContainer != null && streamMessageListenerContainer.isRunning()) { + streamMessageListenerContainer.stop(); + log.info("StreamMessageListenerContainer stopped."); + } + } + }