Skip to content

Commit

Permalink
refactor: add task management logic and error handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
nowgnuesLee committed Jan 3, 2025
1 parent 17d2ecf commit 77bfced
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 15 deletions.
2 changes: 1 addition & 1 deletion chat-server/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Chat Server
A chat server developed using aiohttp, asyncio, and Redis.
## Getting Started
We are currently using Python version 3.12.6. Since Redis is being used, Redis installation is required.
Currently using Python version 3.12.6. Since Redis is being used, Redis installation is required.
### Installation With pip
``` bash
pip install -r requirements.txt
Expand Down
79 changes: 65 additions & 14 deletions chat-server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from decouple import config

# 로깅 설정
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)

# Max processes
MAX_PROCESSES: int = config("MAX_PROCESSES", cast=int, default=5)
MAX_PROCESSES: int = cast(int, config("MAX_PROCESSES", cast=int, default=5))

# Room name
ROOM_NAME: str = cast(str, config("ROOM_NAME", default="chat"))
Expand All @@ -35,15 +35,29 @@ async def redis_subscriber(
"""Redis 채널 구독 및 메시지 수신"""
try:
async with redis.pubsub() as pubsub:
await pubsub.subscribe(room_name)
logging.info("Redis 구독: %s", room_name)
try:
await pubsub.subscribe(room_name)
logging.info("Redis 구독: %s", room_name)

# Redis 메시지 수신 및 WebSocket으로 전달
async for message in pubsub.listen():
if message["type"] == "message":
await ws.send_str(message["data"].decode("utf-8"))
# Redis 메시지 수신 및 WebSocket으로 전달
async for message in pubsub.listen():
if message["type"] == "message":
await ws.send_str(message["data"].decode("utf-8"))
except asyncio.CancelledError:
logging.info("Redis 구독 코루틴 종료")
except Exception as e:
logging.error("Redis 구독 중 에러 발생: %s", e)
finally:
await pubsub.unsubscribe(room_name)
logging.info("Redis 구독 취소: %s", room_name)
except asyncio.CancelledError:
logging.error("Redis 구독 취소됨: %s", room_name)
logging.info("Redis Pub/Sub 연결 코루틴 취소됨")
except Exception as e:
logging.error("Redis 구독 에러: %s", e)
finally:
await redis.pubsub().close()
logging.info("Redis Pub/Sub 연결 코루틴 종료")
raise Exception("Redis Pub/Sub 연결 코루틴 종료")


async def rcv_msg(
Expand All @@ -57,6 +71,13 @@ async def rcv_msg(
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
# 메시지 Redis 채널로 Publish
if msg.data == "close":
logging.info(
"클라이언트 연결 종료, 주소: %s, 유저 아이디: %s",
client_address,
user_id,
)
break
message_obj = {
"type": "message",
"userId": user_id,
Expand All @@ -77,8 +98,13 @@ async def rcv_msg(
user_id,
)
break
except asyncio.CancelledError:
logging.info("메시지 수신 코루틴 취소됨")
except Exception as e:
logging.error("메시지 수신 중 에러 발생: %s", e)
finally:
logging.info("메시지 수신 코루틴 종료")
raise Exception("메시지 수신 코루틴 종료")


# WebSocket 핸들러
Expand Down Expand Up @@ -112,22 +138,42 @@ async def websocket_handler(request):
tg.create_task(rcv_msg(ws, redis, user_id, client_address))
tg.create_task(redis_subscriber(ROOM_NAME, ws, redis))
except asyncio.CancelledError:
logging.info("비동기 작업이 취소되었습니다.")
logging.info("TaskGroup 비동기 작업이 취소되었습니다.")
except ExceptionGroup as eg:
for err in eg.args:
logging.error("TaskGroup 중 에러 발생: %s", err)
except asyncio.CancelledError:
logging.info("소켓 처리 코루틴 종료")
except Exception as e:
logging.error("소켓 처리 중 에러 발생: %s", e)

finally:
await ws.close()
if not ws.closed:
await ws.close()
logging.info("클라이언트 연결 종료")

return ws


# 비동기 작업 취소
async def cleanup_tasks(_: web.Application):
"""비동기 작업 취소"""
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
logging.error("cleanup_task 중 에러 발생: %s", e)


# HTTP 서버 초기화
async def init_app():
"""HTTP 서버 초기화"""
app = web.Application()
app.router.add_get("/chat", websocket_handler) # WebSocket 경로
app.on_shutdown.append(cleanup_tasks) # 서버 종료 시 비동기 작업 취소
return app


Expand All @@ -145,12 +191,17 @@ def start_server():

# 서버 실행
if __name__ == "__main__":
max_processes = min(MAX_PROCESSES, multiprocessing.cpu_count())
try:
with ProcessPoolExecutor(max_workers=MAX_PROCESSES) as executor:
futures = [executor.submit(start_server) for _ in range(MAX_PROCESSES)]
with ProcessPoolExecutor(max_workers=max_processes) as executor:
futures = [executor.submit(start_server) for _ in range(max_processes)]
for future in futures:
try:
future.result()
except asyncio.CancelledError:
logging.info("비동기 작업이 취소되었습니다.")
except KeyboardInterrupt:
logging.error("서버 종료됨")
except Exception as e:
logging.error("서버 실행 중 에러 발생: %s", e)
except KeyboardInterrupt:
Expand Down

0 comments on commit 77bfced

Please sign in to comment.