Skip to content

Commit

Permalink
Add WebSocket support (#40)
Browse files Browse the repository at this point in the history
* Add WebSocket support
* Upd README
  • Loading branch information
joente authored Apr 10, 2024
1 parent bc2788e commit 975c5c4
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 107 deletions.
91 changes: 83 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
* [Client](#client)
* [authenticate](#authenticate)
* [close](#close)
* [close_and_wait](#close_and_wait)
* [connect](#connect)
* [connect_pool](#connect_pool)
* [get_default_scope](#get_default_scope)
* [get_event_loop](#get_event_loop)
* [get_rooms](#get_rooms)
* [is_connected](#is_connected)
* [is_websocket](#is_websocket)
* [query](#query)
* [reconnect](#reconnect)
* [run](#run)
Expand All @@ -31,6 +33,7 @@
* [emit](#emit)
* [no_join](#no_join)
* [Failed packages](#failed-packages)
* [WebSockets](#websockets)
---------------------------------------

## Installation
Expand Down Expand Up @@ -71,8 +74,7 @@ async def hello_world():

finally:
# the will close the client in a nice way
client.close()
await client.wait_closed()
await client.close_and_wait()

# run the hello world example
asyncio.get_event_loop().run_until_complete(hello_world())
Expand Down Expand Up @@ -148,6 +150,16 @@ This method will return immediately so the connection may not be
closed yet after a call to `close()`. Use the [wait_closed()](#wait_closed) method
after calling this method if this is required.

### close_and_wait

```python
async Client().close_and_wait() -> None
```

Close and wait for the the connection to be closed.

This is equivalent of combining [close()](#close)) and [wait_closed()](#wait_closed).

### connect

```python
Expand All @@ -167,11 +179,12 @@ connection before using the connection.
#### Args

- *host (str)*:
A hostname, IP address, FQDN to connect to.
A hostname, IP address, FQDN or URI _(for WebSockets)_ to connect to.
- *port (int, optional)*:
Integer value between 0 and 65535 and should be the port number
where a ThingsDB node is listening to for client connections.
Defaults to 9200.
Defaults to 9200. For WebSocket connections the port must be
provided with the URI _(see host argument)_.
- *timeout (int, optional)*:
Can be be used to control the maximum time the client will
attempt to create a connection. The timeout may be set to
Expand Down Expand Up @@ -207,17 +220,18 @@ to perform the authentication.

```python
await connect_pool([
'node01.local', # address as string
'node02.local', # port will default to 9200
('node03.local', 9201), # ..or with an explicit port
'node01.local', # address or WebSocket URI as string
'node02.local', # port will default to 9200 or ignored for URI
('node03.local', 9201), # ..or with an explicit port (ignored for URI)
], "admin", "pass")
```

#### Args

- *pool (list of addresses)*:
Should be an iterable with node address strings, or tuples
with `address` and `port` combinations in a tuple or list.
with `address` and `port` combinations in a tuple or list. For WebSockets,
the address must be an URI with the port included. (e.g: `"ws://host:9270"`)
- *\*auth (str or (str, str))*:
Argument `auth` can be be either a string with a token or a
tuple with username and password. (the latter may be provided
Expand Down Expand Up @@ -282,6 +296,18 @@ Can be used to check if the client is connected.
#### Returns
`True` when the client is connected else `False`.


### is_websocket

```python
Client().is_websocket() -> bool
```

Can be used to check if the client is using a WebSocket connection.

#### Returns
`True` when the client is connected else `False`.

### query

```python
Expand Down Expand Up @@ -595,3 +621,52 @@ set_package_fail_file('/tmp/thingsdb-invalid-data.mp')
# When a package is received which fails to unpack, the data from this package
# will be stored to file.
```


## WebSockets

Since ThingsDB 1.6 has received WebSocket support. The Python client is able to use the WebSockets protocol by providing the `host` as URI.
For WebSocket connections,the `port` argument will be ignored and must be specified with the URI instead.

Default the `websockets` package is **not included** when installing this connector.

If you want to use WebSockets, make sure to install the package:

```
pip install websockets
```

For example:

```python
import asyncio
from thingsdb.client import Client

async def hello_world():
client = Client()

# replace `ws://localhost:9270` with your URI
await client.connect('ws://localhost:9270')

# for a secure connection, use wss:// and provide an SSL context, example:
# (ssl can be set either to True or False, or an SSLContext)
#
# await client.connect('wss://localhost:9270', ssl=True)

try:
# replace `admin` and `pass` with your username and password
# or use a valid token string
await client.authenticate('admin', 'pass')

# perform the hello world code...
print(await client.query('''
"Hello World!";
''')

finally:
# the will close the client in a nice way
await client.close_and_wait()

# run the hello world example
asyncio.get_event_loop().run_until_complete(hello_world())
```
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
msgpack>=0.6.2
deprecation
# Optional package:
# websockets
3 changes: 1 addition & 2 deletions test_thingsdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ async def async_test_playground(self):
self.assertEqual(data, want)

finally:
client.close()
await client.wait_closed()
await client.close_and_wait()

def test_playground(self):
loop = asyncio.get_event_loop()
Expand Down
73 changes: 50 additions & 23 deletions thingsdb/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Optional, Union, Any
from concurrent.futures import CancelledError
from .buildin import Buildin
from .protocol import Proto, Protocol
from .protocol import Proto, Protocol, ProtocolWS
from ..exceptions import NodeError, AuthError
from ..util import strip_code

Expand Down Expand Up @@ -85,7 +85,7 @@ def is_connected(self) -> bool:
Returns:
bool: `True` when the client is connected else `False`.
"""
return bool(self._protocol and self._protocol.transport)
return bool(self._protocol and self._protocol.is_connected())

def set_default_scope(self, scope: str) -> None:
"""Set the default scope.
Expand Down Expand Up @@ -119,9 +119,9 @@ def close(self) -> None:
closed yet after a call to `close()`. Use the `wait_closed()` method
after calling this method if this is required.
"""
if self._protocol and self._protocol.transport:
self._reconnect = False
self._protocol.transport.close()
self._reconnect = False
if self._protocol:
self._protocol.close()

def connection_info(self) -> str:
"""Returns the current connection info as a string.
Expand All @@ -134,7 +134,7 @@ def connection_info(self) -> str:
"""
if not self.is_connected():
return 'disconnected'
socket = self._protocol.transport.get_extra_info('socket', None)
socket = self._protocol.info()
if socket is None:
return 'unknown_addr'
addr, port = socket.getpeername()[:2]
Expand Down Expand Up @@ -205,11 +205,13 @@ def connect(
Args:
host (str):
A hostname, IP address, FQDN to connect to.
A hostname, IP address, FQDN or URI (for WebSockets) to connect
to.
port (int, optional):
Integer value between 0 and 65535 and should be the port number
where a ThingsDB node is listening to for client connections.
Defaults to 9200.
Defaults to 9200. For WebSocket connections the port must be
provided with the URI (see host argument).
timeout (int, optional):
Can be be used to control the maximum time the client will
attempt to create a connection. The timeout may be set to
Expand Down Expand Up @@ -250,8 +252,19 @@ async def wait_closed(self) -> None:
Can be used after calling the `close()` method to determine when the
connection is actually closed.
"""
if self._protocol and self._protocol.close_future:
await self._protocol.close_future
if self._protocol and self._protocol.is_closing():
await self._protocol.wait_closed()

async def close_and_wait(self) -> None:
"""Close and wait for the connection to be closed.
This is equivalent to calling close() and await wait_closed()
"""
if self._protocol:
await self._protocol.close_and_wait()

def is_websocket(self) -> bool:
return self._protocol.__class__ is ProtocolWS

async def authenticate(
self,
Expand Down Expand Up @@ -538,20 +551,32 @@ def _auth_check(auth):
)
return auth

@staticmethod
def _is_websocket_host(host):
return host.startswith('ws://') or host.startswith('wss://')

async def _connect(self, timeout=5):
host, port = self._pool[self._pool_idx]
try:
conn = self._loop.create_connection(
lambda: Protocol(
if self._is_websocket_host(host):
conn = ProtocolWS(
on_connection_lost=self._on_connection_lost,
on_event=self._on_event,
loop=self._loop),
host=host,
port=port,
ssl=self._ssl)
_, self._protocol = await asyncio.wait_for(
conn,
timeout=timeout)
on_event=self._on_event).connect(uri=host, ssl=self._ssl)
self._protocol = await asyncio.wait_for(
conn,
timeout=timeout)
else:
conn = self._loop.create_connection(
lambda: Protocol(
on_connection_lost=self._on_connection_lost,
on_event=self._on_event,
loop=self._loop),
host=host,
port=port,
ssl=self._ssl)
_, self._protocol = await asyncio.wait_for(
conn,
timeout=timeout)
finally:
self._pool_idx += 1
self._pool_idx %= len(self._pool)
Expand Down Expand Up @@ -614,15 +639,17 @@ async def _reconnect_loop(self):
await self._authenticate(timeout=5)
await self._rejoin()
except Exception as e:
name = host if self._is_websocket_host(host) else \
f'{host}:{port}'
logging.error(
f'Connecting to {host}:{port} failed: '
f'Connecting to {name} failed: '
f'{e}({e.__class__.__name__}), '
f'Try next connect in {wait_time} seconds'
)
else:
if protocol and protocol.transport:
if protocol and protocol.is_connected():
# make sure the `old` connection will be dropped
self._loop.call_later(10.0, protocol.transport.close)
self._loop.call_later(10.0, protocol.close)
break

await asyncio.sleep(wait_time)
Expand Down
33 changes: 23 additions & 10 deletions thingsdb/client/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,39 @@ def __init__(self, barray: bytearray) -> None:
self.total = self.__class__.st_package.size + self.length
self.data = None

def _handle_fail_file(self, message: bytes):
if _fail_file:
try:
with open(_fail_file, 'wb') as f:
f.write(
message[self.__class__.st_package.size:self.total])
except Exception:
logging.exception('')
else:
logging.warning(
f'Wrote the content from {self} to `{_fail_file}`')

def extract_data_from(self, barray: bytearray) -> None:
try:
self.data = msgpack.unpackb(
barray[self.__class__.st_package.size:self.total],
raw=False) \
if self.length else None
except Exception as e:
if _fail_file:
try:
with open(_fail_file, 'wb') as f:
f.write(
barray[self.__class__.st_package.size:self.total])
except Exception:
logging.exception('')
else:
logging.warning(
f'Wrote the content from {self} to `{_fail_file}`')
self._handle_fail_file(barray)
raise e
finally:
del barray[:self.total]

def read_data_from(self, message: bytes) -> None:
try:
self.data = msgpack.unpackb(
message[self.__class__.st_package.size:self.total],
raw=False) \
if self.length else None
except Exception as e:
self._handle_fail_file(message)
raise e

def __repr__(self) -> str:
return '<id: {0.pid} size: {0.length} tp: {0.tp}>'.format(self)
Loading

0 comments on commit 975c5c4

Please sign in to comment.