Skip to content

Commit

Permalink
wb_async :: explicit use of websockets.asyncio.client; improve user-a…
Browse files Browse the repository at this point in the history
…gent with details on architecture
  • Loading branch information
adriansev committed Jan 7, 2025
1 parent 9c64bce commit a87dff2
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
3 changes: 3 additions & 0 deletions alienpy/global_vars.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""alienpy:: GLOBALS"""

import os
import platform
import re
import sys
from socket import gethostname
Expand Down Expand Up @@ -42,6 +43,8 @@

HOSTNAME = gethostname()

UNAME = platform.uname()
PLATFORM_ID = f'{UNAME.machine}/{UNAME.system}/{UNAME.release}'

def get_certs_names() -> CertsInfo:
"""Provide the standard file names for used certificates"""
Expand Down
46 changes: 28 additions & 18 deletions alienpy/wb_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from typing import Optional, TYPE_CHECKING, Union

try:
import websockets.client as wb_client
from websockets.asyncio.client import connect as wb_connect
from websockets.asyncio.client import unix_connect as wb_connect_unix
import websockets.exceptions as wb_exceptions
import websockets.version as wb_version
from websockets.version import version as wb_version
from websockets.extensions import permessage_deflate as _wb_permessage_deflate
except Exception:
print("websockets module could not be imported! Make sure you can do:\npython3 -c 'import websockets.client as wb_client'", file = sys.stderr, flush = True)
Expand All @@ -30,12 +31,15 @@

from .version import ALIENPY_VERSION_STR
from .setup_logging import DEBUG, DEBUG_FILE, print_err
from .global_vars import DEBUG_TIMING, TMPDIR
from .global_vars import DEBUG_TIMING, TMPDIR, PLATFORM_ID
from .tools_nowb import deltat_ms_perf
from .connect_ssl import create_ssl_context, renewCredFilesInfo
from .async_tools import start_asyncio, syncify


PYTHON_VERSION = "{}.{}.{}".format(*sys.version_info)
USER_AGENT = f'alienpy/{ALIENPY_VERSION_STR} websockets/{wb_version} Python/{PYTHON_VERSION} {PLATFORM_ID}'

#########################
# ASYNCIO MECHANICS
#########################
Expand All @@ -58,28 +62,33 @@ async def wb_create(host: str = 'localhost', port: Union[str, int] = '8097', pat
return None
port = str(abs(int(port))) # make sure the port argument is positive

# https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#
# https://websockets.readthedocs.io/en/stable/reference/sync/client.html
QUEUE_SIZE = int(128) # maximum length of the queue that holds incoming messages
MSG_SIZE = None # int(20 * 1024 * 1024) # maximum size for incoming messages in bytes. The default value is 1 MiB. None disables the limit
PING_TIMEOUT = int(os.getenv('ALIENPY_TIMEOUT', '20')) # If the corresponding Pong frame isn’t received within ping_timeout seconds, the connection is considered unusable and is closed
PING_INTERVAL = PING_TIMEOUT # Ping frame is sent every ping_interval seconds
OPEN_TIMEOUT = int(5) # Timeout for opening the connection in seconds
CLOSE_TIMEOUT = int(10) # maximum wait time in seconds for completing the closing handshake and terminating the TCP connection
# https://websockets.readthedocs.io/en/stable/api.html#websockets.protocol.WebSocketCommonProtocol
# we use some conservative values, higher than this might hurt the sensitivity to intreruptions

wb = None
ctx = None
# client_max_window_bits = 12, # tomcat endpoint does not allow anything other than 15, so let's just choose a mem default towards speed
deflateFact = _wb_permessage_deflate.ClientPerMessageDeflateFactory(compress_settings={'memLevel': 4})
headers_list = [('User-Agent', f'alien.py/{ALIENPY_VERSION_STR} websockets/{wb_version.version}')]
# Compressiont settings given by https://docs.python.org/3/library/zlib.html#zlib.compressobj
# client_max_window_bits = 12, # tomcat endpoint does not allow anything other than 15, so let's just choose a mem default towards speed
deflateFact = _wb_permessage_deflate.ClientPerMessageDeflateFactory(compress_settings={'memLevel': 8, 'level': 7})
headers_list = None # [('User-Agent', USER_AGENT)]
if localConnect:
fHostWSUrl = 'ws://localhost/'
logging.info('Request connection to : %s', fHostWSUrl)
socket_filename = f'{TMPDIR}/jboxpy_{str(os.getuid())}.sock'
try:
wb = await wb_client.unix_connect(socket_filename, fHostWSUrl,
max_queue = QUEUE_SIZE, max_size = MSG_SIZE,
ping_interval = PING_INTERVAL, ping_timeout = PING_TIMEOUT,
close_timeout = CLOSE_TIMEOUT, extra_headers = headers_list)
wb = await wb_connect_unix(socket_filename, fHostWSUrl,
max_queue = QUEUE_SIZE, max_size = MSG_SIZE,
ping_interval = PING_INTERVAL, ping_timeout = PING_TIMEOUT,
open_timeout = OPEN_TIMEOUT, close_timeout = CLOSE_TIMEOUT,
user_agent_header = USER_AGENT, extra_headers = headers_list)
except Exception as e:
msg = f'Could NOT establish connection (local socket) to {socket_filename}\n{sys.exc_info()}'
logging.error(msg)
Expand Down Expand Up @@ -158,13 +167,11 @@ async def wb_create(host: str = 'localhost', port: Union[str, int] = '8097', pat
try:
init_begin_wb = None
if DEBUG: init_begin_wb = time.perf_counter()
wb = await wb_client.connect(fHostWSUrl, sock = socket_endpoint, server_hostname = host, ssl = ctx, extensions=[deflateFact],
max_queue=QUEUE_SIZE, max_size=MSG_SIZE,
ping_interval=PING_INTERVAL, ping_timeout=PING_TIMEOUT,
close_timeout=CLOSE_TIMEOUT, extra_headers=headers_list)

if init_begin_wb:
logging.debug('WEBSOCKET DELTA: %s ms', deltat_ms_perf(init_begin_wb))
wb = await wb_connect(fHostWSUrl, sock = socket_endpoint, server_hostname = host, ssl = ctx, extensions = [deflateFact],
max_queue = QUEUE_SIZE, max_size = MSG_SIZE,
ping_interval = PING_INTERVAL, ping_timeout = PING_TIMEOUT,
open_timeout = OPEN_TIMEOUT, close_timeout = CLOSE_TIMEOUT,
user_agent_header = USER_AGENT, additional_headers = headers_list)

except wb_exceptions.InvalidStatusCode as e:
msg = f'Invalid status code {e.status_code} connecting to {socket_endpoint_addr}:{socket_endpoint_port}\n{e!r}'
Expand All @@ -179,7 +186,10 @@ async def wb_create(host: str = 'localhost', port: Union[str, int] = '8097', pat
logging.error(msg)
print_err(f'{msg}\nCheck the logfile: {DEBUG_FILE}')
return None
if wb: logging.info('CONNECTED: %s:%s', wb.remote_address[0], wb.remote_address[1])

if wb:
logging.info('CONNECTED: %s:%s', wb.remote_address[0], wb.remote_address[1])
if init_begin_wb: logging.debug('WEBSOCKET DELTA: %s ms', deltat_ms_perf(init_begin_wb))
return wb


Expand Down

0 comments on commit a87dff2

Please sign in to comment.