Skip to content

Commit

Permalink
Fix(be) 테스트 환경에서 애플리케이션 종료 시 Redis connection 에러 handler 추가
Browse files Browse the repository at this point in the history
  • Loading branch information
minyoongi96 committed Feb 5, 2025
1 parent 2af662e commit 0106076
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
27 changes: 24 additions & 3 deletions be/src/main/java/movlit/be/common/config/RedisStreamConfig.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package movlit.be.common.config;

import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisException;
import jakarta.annotation.PreDestroy;
import java.time.Duration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
Expand All @@ -17,9 +21,8 @@
@Slf4j
public class RedisStreamConfig {

// TODO : AppConfig에서 정의한 threadPoolExecutor의 설정이 Redis Stream 처리에도 적합한지 확인.(스레드 풀 크키, 큐 크기)
// private final ThreadPoolExecutor threadPoolExecutor;
// private final RedisConnectionFactory redisConnectionFactory;
// 종료 상태 플래그 (shutdown 중인지 여부)
private volatile boolean shuttingDown = false;

@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(
Expand All @@ -38,6 +41,17 @@ public StreamMessageListenerContainer<String, MapRecord<String, String, String>>
.executor(taskExecutor)
.batchSize(10)
.pollTimeout(Duration.ofSeconds(1))
.errorHandler(
e -> {
// shutdown 중이고, 예외 메시지가 "Connection closed"인 경우만 무시
// if (shuttingDown && e.getMessage() != null && e.getMessage()
// .contains("Redis exception")) {
if (e instanceof RedisSystemException || e instanceof RedisException) {
log.debug("Ignored connection closed exception during shutdown: {}", e.getMessage());
} else {
log.error("Unexpected error in stream polling task", e);
}
})
.build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(
Expand All @@ -47,4 +61,11 @@ public StreamMessageListenerContainer<String, MapRecord<String, String, String>>
return container;
}

@PreDestroy
public void preDestroy() {
// 애플리케이션 컨텍스트 종료 전에 shutdown 플래그를 true로 설정
shuttingDown = true;
log.info("RedisStreamConfig is shutting down. Setting shutdown flag to true.");
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package movlit.be.pub_sub.chatMessage.application.service;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.Consumer;
Expand All @@ -25,13 +26,16 @@ public class ChatMessageConsumer {
private static final String CONSUMER_GROUP = "chat_message_group"; // Consumer 그룹 이름
private static final String CONSUMER_NAME = "chat_message_consumer"; // Consumer 이름

// 구독(Subscription)을 필드에 보관하여 종료 시점에 취소할 수 있게 함
private Subscription subscription;

@PostConstruct
public void init() {
log.info("==== ChatMessageConsumer init()");
createConsumerGroup();

// Listener Container 시작
Subscription subscription = streamMessageListenerContainer.receive(
subscription = streamMessageListenerContainer.receive(
Consumer.from(CONSUMER_GROUP, CONSUMER_NAME),
StreamOffset.create(MESSAGE_QUEUE, ReadOffset.lastConsumed()),
chatMessageStreamListener
Expand All @@ -48,4 +52,17 @@ private void createConsumerGroup() {
}
}

// 애플리케이션 종료 시 호출되어 리스너와 컨테이너를 종료
@PreDestroy
public void shutdown() {
if (subscription != null) {
subscription.cancel();
log.info("Subscription cancelled.");
}
if (streamMessageListenerContainer != null && streamMessageListenerContainer.isRunning()) {
streamMessageListenerContainer.stop();
log.info("StreamMessageListenerContainer stopped.");
}
}

}

0 comments on commit 0106076

Please sign in to comment.