Skip to content

Commit

Permalink
Merge pull request #14 from nowgnuesLee/feature/backend/refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
nowgnuesLee authored Jan 2, 2025
2 parents 6495c07 + 390ad13 commit 850c721
Showing 1 changed file with 57 additions and 67 deletions.
124 changes: 57 additions & 67 deletions chat-server/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import sys
import json
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
Expand Down Expand Up @@ -31,29 +32,24 @@
async def redis_subscriber(
room_name: str, ws: web.WebSocketResponse, redis: aioredis.Redis
) -> None:
"""Redis 채널 구독"""
pubsub = None
"""Redis 채널 구독 및 메시지 수신"""
try:
pubsub = redis.pubsub()
await pubsub.subscribe(room_name)
logging.info(f"Redis 채널 구독 시작: {room_name}")

# Redis 메시지 수신 및 WebSocket으로 전달
async for message in pubsub.listen():
if message["type"] == "message":
await ws.send_str(message["data"].decode("utf-8"))
async with redis.pubsub() as pubsub:
await pubsub.subscribe(room_name)
logging.info("Redis 구독: %s", room_name)

# Redis 메시지 수신 및 WebSocket으로 전달
async for message in pubsub.listen():
if message["type"] == "message":
await ws.send_str(message["data"].decode("utf-8"))
except asyncio.CancelledError:
logging.info(f"Redis 구독 취소됨: {room_name}")
raise Exception("Redis 구독 취소됨")
finally:
if pubsub:
await pubsub.unsubscribe(room_name)
logging.error("Redis 구독 취소됨: %s", room_name)


async def rcv_msg(
ws: web.WebSocketResponse,
redis: aioredis.Redis,
client_name: str,
user_id: str,
client_address: str,
) -> None:
"""웹 소켓에서 메시지를 수신하고 Redis 채널로 Publish"""
Expand All @@ -63,22 +59,26 @@ async def rcv_msg(
# 메시지 Redis 채널로 Publish
message_obj = {
"type": "message",
"userId": client_name,
"userId": user_id,
"message": msg.data,
}
await redis.publish(ROOM_NAME, json.dumps(message_obj))
elif msg.type == web.WSMsgType.CLOSE:
logging.info(
f"클라이언트 연결 종료: {client_address}, 이름: {client_name}"
"클라이언트 연결 종료, 주소: %s, 유저 아이디: %s",
client_address,
user_id,
)
break
elif msg.type == web.WSMsgType.ERROR:
logging.info(
f"클라이언트 에러 발생: {client_address}, 이름: {client_name}"
logging.error(
"클라이언트 연결 종료, 주소: %s, 유저 아이디: %s",
client_address,
user_id,
)
break
except Exception as e:
raise e
logging.error("메시지 수신 중 에러 발생: %s", e)


# WebSocket 핸들러
Expand All @@ -87,48 +87,38 @@ async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request) # WebSocket 연결 준비

redis = None
tasks = []

try:
redis = await aioredis.from_url(REDIS_URL)

# 유저 카운트 증가 및 이름 할당
user_count = await redis.incr("user_count")
client_name = f"User{user_count}"
client_address = request.remote
logging.info(
f"클라이언트 연결됨: {client_address}, 룸: {ROOM_NAME}, 이름: {client_name}"
)

# 클라이언트에 이름 전달
await ws.send_json(
{
"type": "info",
"userId": client_name,
"message": f"Welcome! Your name is: {client_name}",
}
)

# Redis Pub/Sub 관련 태스크 생성
tasks = [
asyncio.create_task(
redis_subscriber(ROOM_NAME, ws, redis)
), # Redis 채널 구독
asyncio.create_task(
rcv_msg(ws, redis, client_name, client_address)
), # 메시지 수신 및 Redis 채널로 Publish
]

await asyncio.gather(*tasks)

async with aioredis.from_url(REDIS_URL) as redis:
# 유저 카운트 증가 및 이름 할당
user_count = await redis.incr("user_count")
user_id = f"User{user_count}"
client_address = request.remote
logging.info(
"클라이언트 연결됨, 주소: %s, 유저 아이디: %s",
client_address,
user_id,
)

# 클라이언트에 이름 전달
await ws.send_json(
{
"type": "info",
"userId": user_id,
"message": f"Welcome! Your name is: {user_id}",
}
)
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(rcv_msg(ws, redis, user_id, client_address))
tg.create_task(redis_subscriber(ROOM_NAME, ws, redis))
except asyncio.CancelledError:
logging.info("비동기 작업이 취소되었습니다.")
except Exception as e:
logging.error(f"에러 발생: {e}")
logging.error("소켓 처리 중 에러 발생: %s", e)

finally:
for task in tasks:
task.cancel()
if redis:
await redis.close()
await ws.close()
logging.info("클라이언트 연결 종료")

return ws

Expand All @@ -143,14 +133,14 @@ async def init_app():

def start_server():
"""HTTP 서버 시작"""
logging.info(f"서버 시작됨: {multiprocessing.current_process().pid}")
logging.info(f"서버 주소: {HOST}:{PORT}")
logging.info("서버 시작됨: %d", multiprocessing.current_process().pid)
logging.info("서버 주소: %s:%d", HOST, PORT)
try:
web.run_app(init_app(), host=HOST, port=PORT, reuse_port=True)
except Exception as e:
raise e
logging.error("서버 실행 중 에러 발생: %s", e)
finally:
logging.info(f"서버 종료됨: {multiprocessing.current_process().pid}")
logging.info("서버 종료됨: %d", multiprocessing.current_process().pid)


# 서버 실행
Expand All @@ -161,9 +151,9 @@ def start_server():
for future in futures:
try:
future.result()
except asyncio.CancelledError:
logging.error("비동기 작업이 취소되었습니다.")
except Exception as e:
logging.error(f"알 수 없는 에러가 발생했습니다: {e}")
logging.error("서버 실행 중 에러 발생: %s", e)
except KeyboardInterrupt:
logging.error("종료됨")
finally:
sys.exit(0)

0 comments on commit 850c721

Please sign in to comment.