-
Notifications
You must be signed in to change notification settings - Fork 3
대기열 구현
서버에대한 대규모 동시
요청 방지
- 서버에 동시에 많은 요청이 발생하면, 서버 부하가 급증해서 병목, 성능 저하 등 서버 장애 발생
- 이를 방지하기 위해 텀을 두고 순차적으로 트래픽을 흘려보내는 것
- 인메모리 DB
- 구현 편의성
- 여러 자료구조 제공
- key에 대한 TTL을 설정할 수 있음
자료구조: sorted set
- 대기열은 정렬이 되어있어야 한다.
- 연결이 끊긴 유저를 대기열에서 선택적으로 제거해야 한다.
-
Redis lists는 선형탐색으로 인해 시간복잡도가
O(n)
-
Redis sorted sets은
skip list
,hash table
로 구현되어있어 노드를 선택 제거하는데O(log(n))
이 걸린다.
-
Redis lists는 선형탐색으로 인해 시간복잡도가
SSE
를 사용해서 서버에서 클라이언트로 메시지를 전송한다.
- 클라이언트에서 서버에 API를 계속 호출하는
폴링
방식의 경우 성능 면에서 비효율적이다. - 클라이언트에서 서버로 메시지를 전송하는 경우는 없으므로
웹소켓
대신 부하가 좀 더 적은SSE
를 사용한다.
- Stream Publisher(스케줄러)
- 대기열(Waiting Queue)에 브라우저 커넥션 ID 저장/삭제 (userId가 아닌, 브라우저 커넥션별로 구분)
- 주기적으로 입장 처리 및 대기 정보를 Stream에 Publish
- 메인 서버와 분리 가능
- Stream Consumer
- 클라이언트에게 SSE를 통해 메시지를 전송
- WaitingState: 대기 번호, 총 대기 인원수, 예상 대기시간
- EntryPermission: 입장 토큰
- 클라이언트에게 SSE를 통해 메시지를 전송
- 수평 확장 가능
대기열 입장 및 대기열 상태 구독 /api/queue/stream
@RestController
@RequestMapping("/api/queue")
@RequiredArgsConstructor
@Tag(name = "Queue API", description = "Queue API 엔드포인트")
public class QueueController {
private final ServerRegistry serverRegistry;
private final QueueService queueService;
private final SseHandler sseHandler;
@Operation(summary = "공연 예매 대기열 입장 및 구독")
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(
@AuthenticationPrincipal MemberPrincipal principal,
@RequestParam("sessionId") Long performanceSessionId
) {
// SSE 세션 정보 생성
QueueSession session = queueService.genQueueSession(performanceSessionId, principal.id(),
sseHandler.genConnectionId());
// 대기열 입장
queueService.enterWaitingLine(performanceSessionId, session.getConnectionId());
// SSE 구독 및 emitter 반환
return sseHandler.subscribe(session, () -> {
// 연결 종료 시 수행할 작업
sseHandler.removeEmitter(session.getConnectionId()); // emitter 제거
queueService.remove(performanceSessionId, session.getConnectionId()); // 대기열에서 제거
serverRegistry.remove(session.getConnectionId()); // 연결된 서버 정보 제거
}).getEmitter();
}
}
- 대기열 입장 및 대기열 상태를 구독합니다.
- 각 공연 세션별로 대기열이 구분됩니다.
- 클라이언트는 SSE를 통해 지속적으로 본인의 대기열 번호를 수신할 수 있습니다.
- 대기 완료 후 입장 토큰을 받습니다. 이후
구역 및 좌석 조회 ~ 예매
API 요청을 할 수 있습니다.- 예매 이후 결제 과정에서는 사용자가 직접 결제 정보를 입력하는 과정에서 자연스럽게 트래픽이 분산될 것이라고 판단해서 제한하지 않았습니다.
서버 확장성을 위해, 대기 정보
(대기 번호, 총 대기 인원수, 예상 대기시간)와 입장 메시지
(입장토큰)를 하나의 대기열 서버에서 발행하고, 각 SSE 서버에서는 그 정보를 받아 SSE로 전송하는 방식으로 설계했습니다.
이를 구현하기 위한 방법으로 3가지를 고려했습니다. Redis Stream
, Redis Pub/Sub
, Kafka
최종적으로는 Redis Stream
을 사용하게 되었습니다.
-
메시지를 저장하지 않고 즉시 전파하기 때문에 지연이 거의없고 속도가 빠른 편
-
메시지를 발행하는 순간, 연결된 모든 구독자에게 브로드캐스트 방식으로 전송됨
-
이로 인해, 각 SSE 서버는 자신이 관리하지 않는 세션에 대한 메시지도 모두 수신하게 되어 비효율적
→ ex) A 서버는 세션 1만 관리하지만, 세션 2~1000번에 대한 메시지도 모두 전달받음
-
성능 문제가 발생할지는 확실하지 않지만, 불필요한 네트워크 및 처리 리소스 낭비가 우려됨
-
메시지가 저장되지 않기 때문에, 일시적인 장애나 네트워크 단절 시 메시지 유실 가능 ⇒
대기 정보
는 괜찮지만,입장 메시지
에 부적합 (가장 큰 이유)
- 메시지를 로그처럼 저장하는 구조로, 한 번 발행된 메시지는 이후에도 조회 가능
- 각 SSE 서버는 Redis Stream의 Consumer Group 내에서 자신이 담당하는 세션의 Stream만 읽도록 설계 가능 → 불필요한 메시지를 수신하지 않음 = 고립된 소비 + 높은 효율
-
메시지 순서 보장, 전송 이력 추적 가능, 처리 실패 시 Pending Entry List(PEL)로 재처리 가능 → 유실되지 않음 ⇒
대기 정보
,입장 메시지
모두 적합 - 메시지는 컨슈머 그룹 내에서 하나의 컨슈머에게만 전달되어 중복 처리 없이 안전하게 분산 가능
- 확장성과 신뢰성 모두 확보 가능하며, 특정 세션 메시지만 효율적으로 소비할 수 있어 성능 최적화에도 유리
- 디스크 기반 로그 저장
- 강한 내구성
- 실시간 처리에는 다소 부적합
- 대용량 로그 처리에 강함
- 인프라 구성이 복잡함
- 가용성을 위해 브로커 3대 이상 권장
- 러닝 커브가 높음
- 각 SSE 서버는 대기열 입장 시 publish
- 대기열 서버에서 주기적으로 입장 메시지, 대기열 상태 publish
- 각 SSE 서버는 입장 메시지, 대기열 상태 메시지를 consume
- 각 커넥션 별 SSE 전송
SSE 서버는 각 브라우저와 연결을 유지하고 있기 때문에, 해당 서버가 관리하는 커넥션에 대한 대기열 메시지만 처리해야 합니다. 이를 위해 서버 고유의 serverId
를 기반으로 Redis Stream의 키를 구성하여, 각 서버가 자신에게 할당된 메시지만 처리하도록 했습니다.
- 단일 Consumer로도 메시지를 소비할 수 있지만,
XREADGROUP
기반의 Consumer Group을 사용하면 처리 실패 시 Redis의 PEL(Pending Entry List)에 메시지가 남아 재처리가 가능해집니다. 따라서 현재는 Group 내 Consumer가 1개뿐이지만, 신뢰성을 위해 Consumer Group 방식을 사용했습니다. - Consumer Group 내에는 여러 Consumer를 둘 수 있지만, 현재 서버는 CPU 코어가 1개이므로 병렬 처리를 하더라도 실질적인 이점이 적습니다. 따라서 현재는 단일 Consumer만을 사용하고 있습니다.
/**
* 입장 처리 및 대기열 상태 publish
* 전체 대기열을 병렬로 처리
*/
public void process() {
// 전체 대기열 목록 조회
List<Long> performanceSessionIds = queueService.getAllPerformanceSessionIds();
if (performanceSessionIds == null || performanceSessionIds.isEmpty()) {
return;
}
// 각 대기열을 병렬로 처리
performanceSessionIds.forEach(performanceSessionId ->
CompletableFuture.runAsync(() -> processQueue(performanceSessionId),
executorConfig.threadPoolTaskExecutor()));
}
/**
* 하나의 대기열에 대한 입장 처리와 상태 publish
*/
private void processQueue(Long performanceSessionId) {
// 입장할 인원 수만큼 대기열에서 제거
List<String> connectionIds = queueService.pollTopCount(performanceSessionId, entryCount);
// 각 인원을 비동기로 입장 처리
connectionIds.forEach(connectionId ->
CompletableFuture.runAsync(() -> processEntry(performanceSessionId, connectionId),
executorConfig.threadPoolTaskExecutor())
);
// 대기열 상태 publish
publishWaitingState(performanceSessionId);
}
/**
* 개별 사용자 입장 처리
*/
private void processEntry(Long performanceSessionId, String connectionId) {
// 입장 처리
queueService.enterEntryLine(performanceSessionId, connectionId);
}
- 입장 처리, 대기열 상태 Publish를 하나의 스케줄러에서 처리합니다.
- 테스트가 쉽고 메시지 처리 작업에 대한 관리가 편합니다.
- 대기열 상태가 갱신되지 않았을 때 메시지를 보낼 필요가 없습니다.
- 각 대기열에 대해 반복하는 부분은 비동기로 병렬적으로 처리할 수 있도록 했습니다.
- Stream에 publish하는 부분은 모두 비동기로 처리하여 Redis I/O 대기로 인한 처리 지연을 방지했습니다.
- Flow
- 전체 대기열을 조회합니다.
- 각 대기열별로 입장 처리(대기열에서 제거, 입장 메시지 publish)를 합니다.
- 대기열 전체 상태를 publish 합니다.
/**
* 무한 루프를 돌며 Redis Stream 메시지를 blocking 방식으로 소비
*/
private void consumeLoop() {
String consumerGroup = getConsumerGroupName();
String consumerName = getConsumerName();
String streamKey = getStreamKey();
while (true) {
consume(consumerGroup, consumerName, streamKey);
}
}
/**
* 메시지 소비
*/
public void consume(String consumerGroup, String consumerName, String streamKey) {
try {
List<MapRecord<String, Object, Object>> messages = getMessages(consumerGroup, consumerName, streamKey);
if (messages == null || messages.isEmpty()) {
return;
}
// 메시지 처리 비동기로 수행
messages.forEach(message -> CompletableFuture.runAsync(() -> {
try {
// 메시지 처리
handleMessage(message);
} catch (IOException e) {
// 연결이 끊겨서 메시지를 못보낸 경우
log.warn("Error handling message: {}", e.getMessage());
}
// 연결이 끊겨서 보내지 못한 메시지는 다시 보내지 못하므로 ACK 처리
redisTemplate.opsForStream().acknowledge(streamKey, consumerGroup, message.getId());
}, getExecutor()));
} catch (Exception e) {
log.error("Error while consuming Redis Stream: {}", e.getMessage(), e);
}
}
- 입장 메시지, 대기열 상태 정보를 Redis Stream에서 읽어오고, 각각에 대해 SSE 전송을 합니다.
- 처음에는 SSE 전송 실패 시 재처리 로직이 필요한지 고민했으나, 연결이 끊긴 유저에 대해서는 별도로 서버가 책임지지 않아도 된다고 판단해서 추가하지 않았습니다.
/**
* 트래픽에 따른 입장 인원수 계산
*/
@Component
@RequiredArgsConstructor
public class EntryCountDecider {
private final RedisMetricRepository redisMetricsRepository;
public int decideEntryCount() {
Long tps = redisMetricsRepository.getLatestTps();
if (tps == null) {
return 0;
}
return interpolate(tps, 0, 100, 50, 1);
}
/**
* 선형 보간으로 입장 인원 수 계산
*/
private int interpolate(long tps, long minTps, long maxTps, int maxCnt, int minCnt) {
// TPS가 범위 밖일 경우 최대/최소값
if (tps <= minTps)
return maxCnt;
if (tps >= maxTps)
return minCnt;
// 비율 계산
double ratio = (double)(tps - minTps) / (maxTps - minTps); // 현재 TPS가 어느 위치인지에 대한 비율
return (int)Math.round(minCnt + ratio * (maxCnt - minCnt)); // 해당 비율에 따라 입장 인원 계산
}
}
- 서킷 브레이커가 메트릭(응답시간, 실패율 등)을 통해 외부 장애를 판단하고 요청을 차단한다는 점에 착안하여, 대기열 서버에서 트래픽을 실시간으로 조절할 수 있는 기능에 대해 생각했습니다.
- 메인 서버에 부하가 클 때는 주기별 입장 인원 수를 줄이고 부하가 적을 때는 늘림으로써, 메인 서버로 가는 트래픽을 유연하게 조절하여 UX 및 서버 안정성을 높였습니다.
- 마이크로미터를 통해 수집한 메인 서버의 TPS를 Redis에 기록하고, 대기열 서버에서 해당 정보를 체크하여 입장 인원을 조절합니다. TTL을 설정하여 메인 서버 장애 여부를 체크합니다.(데이터가 없다면 장애가 발생한 것)
/**
* IP 기반 요청 제한 필터
* - 지정된 경로에 대해 IP별 요청 횟수를 제한
*/
@Component
@RequiredArgsConstructor
@Order(1)
public class IpRateLimitingFilter extends OncePerRequestFilter {
private final ObjectMapper objectMapper;
private static final int EXPIRE_MINUTES = 10; // 캐시 만료 시간
private static final int MAXIMUM_SIZE = 100_000; // 캐시 용량
private static final int TRIAL_LIMIT = 5; // 주기당 요청 가능 횟수
private static final int INTERVAL_SECONDS = 10; // 요청 주기
/**
* IP별 요청 제한 버킷 캐시
* - 10분간 요청 없으면 캐시 자동 제거
* - 최대 10만 개 IP까지 저장
*/
private final Cache<String, Bucket> bucketCache = Caffeine.newBuilder()
.expireAfterAccess(EXPIRE_MINUTES, TimeUnit.MINUTES)
.maximumSize(MAXIMUM_SIZE)
.build();
/**
* 요청 필터 처리
* - 제한 대상 경로인지 확인 후 IP 기준으로 버킷에서 토큰 소비
*/
@Override
protected void doFilterInternal(
HttpServletRequest request,
@NonNull HttpServletResponse response,
@NonNull FilterChain filterChain) throws ServletException, IOException {
// 제한 대상 경로 필터링
if (!(request.getRequestURI().equals("/api/areas")
|| request.getRequestURI().equals("/api/areas/subscribe")
)) {
filterChain.doFilter(request, response);
return;
}
String ip = getClientIp(request);
// IP별 버킷 조회 또는 생성 (없으면 자동 생성 후 캐시에 등록)
Bucket bucket = bucketCache.get(ip, k -> createNewBucket());
// 토큰이 없으면 요청 제한 (429 응답 반환)
if (!bucket.tryConsume(1)) {
response.setStatus(RsConstant.TOO_MANY_REQUESTS);
response.setContentType("application/json;charset=UTF-8");
String json = objectMapper.writeValueAsString(RsData.from(TOO_MANY_REQUESTS));
PrintWriter writer = response.getWriter();
writer.write(json);
writer.flush(); // 클라이언트에게 즉시 응답 전송
return;
}
// 토큰이 있으면 다음 필터로 요청 전달
filterChain.doFilter(request, response);
}
/**
* 새 버킷 생성
*/
private Bucket createNewBucket() {
Bandwidth limit = Bandwidth.builder()
.capacity(TRIAL_LIMIT) // 최대 요청 가능 횟수
.refillIntervally(TRIAL_LIMIT, Duration.ofSeconds(INTERVAL_SECONDS)) // 요청 가능 횟수 리필
.build();
return Bucket.builder()
.addLimit(limit)
.build();
}
/**
* 클라이언트 IP 추출
* - 프록시 환경 고려하여 X-Forwarded-For 헤더 우선 사용
*/
private String getClientIp(HttpServletRequest request) {
String xfHeader = request.getHeader("X-Forwarded-For");
return xfHeader != null ? xfHeader.split(",")[0] : request.getRemoteAddr();
}
}
- 대기 완료 후 호출할 수 있는
구역 및 좌석 조회
API에 대해서는 Rate Limiter를 적용하여 취소표를 노리는 사람들로 인해 발생하는 API 반복 호출을 방지했습니다.
서버 ID를 EC2 인스턴스 ID 또는 랜덤값으로 설정합니다.
/**
* 서버 ID 초기화
*/
@PostConstruct
public void init() {
try {
// AWS EC2 인스턴스 ID를 이용하여 서버 ID 초기화
serverId = EC2MetadataUtils.getInstanceId();
} catch (Exception e) {
log.error("Failed to get server ID, using random ID instead: ", e);
// 랜덤 값으로 서버 ID 초기화
serverId = UUID.randomUUID().toString();
}
}
대기열 서버에서 Stream에 기록할 때, Stream Key를 서버 ID로 구분해야 각 서버에서 자신이 맡은 Stream 메시지만 처리할 수 있습니다. 따라서 서버 ID를 Redis에 저장합니다.
/**
* 서버 식별자 저장소 (Redis)
*/
@Repository
@RequiredArgsConstructor
public class RedisServerRegistry implements ServerRegistry {
private static final String SERVER_PREFIX = "connection-server"; // 커넥션 서버
private final StringRedisTemplate redisTemplate;
@Override
public void save(String connectionId, String serverId) {
redisTemplate.opsForValue().set(getServerKey(connectionId), serverId);
}
@Override
public String getServerId(String connectionId) {
String serverId = redisTemplate.opsForValue().get(getServerKey(connectionId));
if (serverId == null) {
throw new NoSuchElementException("No connected server found for connection: " + connectionId);
}
return serverId;
}
@Override
public void remove(String connectionId) {
redisTemplate.delete(getServerKey(connectionId));
}
private String getServerKey(String connectionId) {
return SERVER_PREFIX + ":connection-id:" + connectionId;
}
@Override
public void clearAll() {
Set<String> keys = redisTemplate.keys(SERVER_PREFIX + ":*");
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}
}
}
/**
* SSE Connection 정보
*/
@Getter
@SuperBuilder
public abstract class SseConnection {
@Setter
private SseEmitter emitter;
private final String connectionId;
private final String serverId;
}
/**
* SSE Handler
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class SseHandler {
private final static long TIMEOUT_MILLIS = -1L; // Emitter 연결 지속 시간 (무제한)
private final SseConnectionRegistry sseConnectionRegistry;
/**
* SSE 구독 시작 (Emitter 생성 및 등록)
* - connectionId로 연결 정보 등록
* - 연결 종료/타임아웃/에러 발생 시 cleanup 실행
*/
public <T extends SseConnection> T subscribe(T connection, Runnable cleanup) {
SseEmitter emitter = new SseEmitter(TIMEOUT_MILLIS);
connection.setEmitter(emitter);
sseConnectionRegistry.add(connection.getConnectionId(), connection);
emitter.onCompletion(cleanup);
emitter.onTimeout(cleanup);
emitter.onError(e -> {
log.warn("[SSE] 연결 끊김 또는 예외 (connectionId: {}): {}", connection.getConnectionId(), e.getMessage());
cleanup.run();
});
return connection;
}
/**
* connectionId로 SSE 연결 정보 조회
*/
@SuppressWarnings("unchecked")
public <T extends SseConnection> T getSession(String connectionId, Class<T> type) {
SseConnection connection = sseConnectionRegistry.getConnection(connectionId);
if (connection == null) {
return null;
}
if (!type.isInstance(connection)) {
log.warn("Invalid connection type - connectionId: {}", connectionId);
return null;
}
return (T)connection;
}
/**
* connectionId로 등록된 SseEmitter 조회
*/
public SseEmitter getEmitter(String connectionId) {
return sseConnectionRegistry.getConnection(connectionId).getEmitter();
}
/**
* connectionId로 등록된 Emitter 및 연결 정보 제거
*/
public void removeEmitter(String connectionId) {
sseConnectionRegistry.remove(connectionId);
}
/**
* 지정한 connectionId로 SSE 메시지 전송
* @param eventName SSE 이벤트 이름
* @param data 전송할 데이터
*/
public <T> void sendMessage(String eventName, String connectionId, T data) throws IOException {
SseEmitter emitter = getEmitter(connectionId);
emitter.send(
SseEmitter.event()
.name(eventName)
.data(data)
);
}
/**
* 지정한 connectionId의 SSE 연결 종료
*/
public void complete(String connectionId) {
SseEmitter emitter = getEmitter(connectionId);
if (emitter != null) {
emitter.complete();
}
}
/**
* 새로운 connectionId(UUID)를 생성
*/
public String genConnectionId() {
return UUID.randomUUID().toString();
}
}
- SSE 연결마다 랜덤 값으로 connectionId를 생성하여 커넥션 구분
- 입장이 허용될 때까지 계속 SSE 연결 상태를 유지해야하므로 Emitter 연결 시간을 무제한으로 설정
- 연결 종료/타임아웃/에러 발생 시 커넥션 정보 삭제, cleanup 실행(ex. 대기열에서 제거 등)
- SSE 서버를 대기열 서버와 분리하고, 대기열 서버도 메인 서버와 분리하여 메인 서버로부터 메트릭을 수집합니다.
- 로드 밸런서(ELB)를 추가하여 각 대기열 입장 및 구독을 여러 SSE 서버가 분담하도록 합니다.