Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve WebSocket close implementation #200

Merged
merged 2 commits into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions curl_cffi/requests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"Session",
"AsyncSession",
"BrowserType",
"CurlWsFlag",
"request",
"head",
"get",
Expand All @@ -16,19 +17,21 @@
"Request",
"Response",
"WebSocket",
"WebSocketError",
"WsCloseCode",
]

from functools import partial
from io import BytesIO
from typing import Callable, Dict, Optional, Tuple, Union

from ..const import CurlHttpVersion
from ..const import CurlHttpVersion, CurlWsFlag
from .cookies import Cookies, CookieTypes
from .models import Request, Response
from .errors import RequestsError
from .headers import Headers, HeaderTypes
from .session import AsyncSession, BrowserType, Session
from .websockets import WebSocket
from .websockets import WebSocket, WebSocketError, WsCloseCode

# ThreadType = Literal["eventlet", "gevent", None]

Expand Down
89 changes: 67 additions & 22 deletions curl_cffi/requests/websockets.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,37 @@
from typing import Callable, Optional, Tuple
import asyncio
import struct
from enum import IntEnum
from typing import Callable, Optional, Tuple

from curl_cffi.const import CurlECode, CurlWsFlag
from curl_cffi.curl import CurlError


ON_MESSAGE_T = Callable[["WebSocket", bytes], None]
ON_ERROR_T = Callable[["WebSocket", CurlError], None]
ON_OPEN_T = Callable[["WebSocket"], None]
ON_CLOSE_T = Callable[["WebSocket"], None]
ON_CLOSE_T = Callable[["WebSocket", int, str], None]


class WsCloseCode(IntEnum):
OK = 1000
GOING_AWAY = 1001
PROTOCOL_ERROR = 1002
UNSUPPORTED_DATA = 1003
UNKNOWN = 1005
ABNORMAL_CLOSURE = 1006
INVALID_DATA = 1007
POLICY_VIOLATION = 1008
MESSAGE_TOO_BIG = 1009
MANDATORY_EXTENSION = 1010
INTERNAL_ERROR = 1011
SERVICE_RESTART = 1012
TRY_AGAIN_LATER = 1013
BAD_GATEWAY = 1014


class WebSocketError(CurlError):
pass


class WebSocket:
Expand Down Expand Up @@ -69,23 +93,44 @@ def run_forever(self):
"""
if self.on_open:
self.on_open(self)
try:
# Keep reading the messages and invoke callbacks
while self.keep_running:
try:
msg, flags = self.recv()
if self.on_message:
self.on_message(self, msg)
if flags & CurlWsFlag.CLOSE:
self.keep_running = False
except CurlError as e:
if self.on_error:
self.on_error(self, e)
finally:
if self.on_close:
self.on_close(self)

def close(self, msg: bytes = b""):

# Keep reading the messages and invoke callbacks
while self.keep_running:
try:
msg, flags = self.recv()
if self.on_message:
self.on_message(self, msg)
if flags & CurlWsFlag.CLOSE:
self.keep_running = False
# Unpack close code and reason
if len(msg) < 2:
code = WsCloseCode.UNKNOWN
reason = ""
else:
try:
code = struct.unpack_from("!H", msg)[0]
reason = msg[2:].decode()
except UnicodeDecodeError:
raise WebSocketError("Invalid close message", WsCloseCode.INVALID_DATA)
except Exception:
raise WebSocketError("Invalid close frame", WsCloseCode.PROTOCOL_ERROR)
else:
if code < 3000 and (code not in WsCloseCode or code == 1005):
raise WebSocketError("Invalid close code", WsCloseCode.PROTOCOL_ERROR)
if self.on_close:
self.on_close(self, code, reason)
except WebSocketError as e:
# Follow the spec to close the connection
# TODO: Consider adding setting to autoclose connection on error-free close
self.close(e.code)
if self.on_error:
self.on_error(self, e)
except CurlError as e:
if self.on_error:
self.on_error(self, e)

def close(self, code: int = WsCloseCode.OK, message: bytes = b""):
msg = struct.pack("!H", code) + message
# FIXME how to reset, or can a curl handle connect to two websockets?
self.send(msg, CurlWsFlag.CLOSE)
self.keep_running = False
Expand All @@ -97,13 +142,13 @@ def loop(self):
self._loop = asyncio.get_running_loop()
return self._loop

async def arecv(self):
async def arecv(self) -> Tuple[bytes, int]:
return await self.loop.run_in_executor(None, self.recv)

async def asend(self, payload: bytes, flags: CurlWsFlag = CurlWsFlag.BINARY):
return await self.loop.run_in_executor(None, self.send, payload, flags)

async def aclose(self):
await self.loop.run_in_executor(None, self.close)
async def aclose(self, code: int = WsCloseCode.OK, message: bytes = b""):
dolfies marked this conversation as resolved.
Show resolved Hide resolved
await self.loop.run_in_executor(None, self.close, code, message)
self.curl.reset()
self.session.push_curl(self.curl)