Skip to content

Commit

Permalink
Moved Sub and Dealer into classes using composition of base ZmqConnec…
Browse files Browse the repository at this point in the history
…tion class; Updated code to use asyncio.TaskGroup() from Python 3.11
  • Loading branch information
OCopping committed Mar 6, 2024
1 parent b1d23b8 commit 266d871
Showing 1 changed file with 133 additions and 70 deletions.
203 changes: 133 additions & 70 deletions src/fastcs/connections/zmq_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,40 @@

import asyncio
from dataclasses import dataclass
from typing import Iterable, List, Optional
from typing import Any, Coroutine, Iterable, List, Optional

import aiozmq
import zmq


@dataclass
class ZMQConnectionSettings:
_host: str = "127.0.0.1"
_port: int = 5555
_type: int = zmq.DEALER


class ZMQConnection:
"""An adapter for a ZeroMQ data stream."""

def __init__(self, settings: ZMQConnectionSettings) -> None:
self._host, self._port, self._type = (settings._host, settings._port, settings._type)
self._host, self._port = (

Check warning on line 21 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L21

Added line #L21 was not covered by tests
settings._host,
settings._port,
)
self._type: zmq.SocketType = zmq.SocketType.PAIR
self.running: bool = False
self._lock = asyncio.Lock()

self._task_list: List[Coroutine[Any, Any, Any]] = []
self._send_message_queue: asyncio.Queue = asyncio.Queue()
self._recv_message_queue: asyncio.Queue = asyncio.Queue()

Check warning on line 30 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L25-L30

Added lines #L25 - L30 were not covered by tests

async def start_stream(self) -> None:
"""Start the ZeroMQ stream."""

self._socket = await aiozmq.create_zmq_stream(

Check warning on line 35 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L35

Added line #L35 was not covered by tests
self._type, connect=f"tcp://{self._host}:{self._port}"
) # type: ignore
if self._type == zmq.SUB:
self._socket.transport.setsockopt(zmq.SUBSCRIBE, b"")
assert isinstance(self._socket.transport, aiozmq.ZmqTransport)
self._socket.transport.setsockopt(zmq.LINGER, 0)

Check warning on line 39 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L38-L39

Added lines #L38 - L39 were not covered by tests

async def close_stream(self) -> None:
Expand All @@ -51,34 +56,6 @@ def send_message(self, message: List[bytes]) -> None:
"""
self._send_message_queue.put_nowait(message)

Check warning on line 57 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L57

Added line #L57 was not covered by tests

async def _read_response(self) -> Optional[bytes]:
"""
Read and return a response once received on the socket.
Returns:
Optional[bytes]: If received, a response is returned, else None
"""
if self._type is not zmq.DEALER:
try:
resp = await asyncio.wait_for(self._socket.read(), timeout=20)
return resp[0]
except asyncio.TimeoutError:
pass
else:
discard = True
while discard:
try:
multipart_resp = await asyncio.wait_for(
self._socket.read(), timeout=20
)
if multipart_resp[0] == b"":
discard = False
resp = multipart_resp[1]
return resp
except asyncio.TimeoutError:
pass
return None

async def get_response(self) -> bytes:
"""
Get response from the received message queue.
Expand All @@ -90,43 +67,82 @@ async def get_response(self) -> bytes:

async def run_forever(self) -> None:
"""Run the ZeroMQ adapter continuously."""
self._send_message_queue: asyncio.Queue = asyncio.Queue()
self._recv_message_queue: asyncio.Queue = asyncio.Queue()

try:
if getattr(self, "_socket", None) is None:
await self.start_stream()
except Exception as e:
raise Exception("Exception when starting stream:", e)

self.running = True

Check warning on line 71 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L71

Added line #L71 was not covered by tests

if self._type == zmq.DEALER:
await asyncio.gather(
*[
self._process_message_queue(),
self._process_response_queue(),
]
)
elif self._type == zmq.SUB:
await asyncio.gather(
*[
self._process_response_queue(),
]
)
async with asyncio.TaskGroup() as tg:
for task in self._task_list:
tg.create_task(task)

Check warning on line 75 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L73-L75

Added lines #L73 - L75 were not covered by tests

def check_if_running(self):
"""Return the running state of the adapter."""
return self.running

Check warning on line 79 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L79

Added line #L79 was not covered by tests

async def _process_message_queue(self) -> None:
"""Process message queue for sending messages over the ZeroMQ stream."""
"""Process message queue for sending messages over the ZMQ stream."""
running = True
while running:
message = await self._send_message_queue.get()
await self._process_message(message)
running = self.check_if_running()

Check warning on line 87 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L83-L87

Added lines #L83 - L87 were not covered by tests

async def _process_message(self, message: Iterable[bytes]) -> None:
"""Process message to send over the ZeroMQ stream.
Args:
message (Iterable[bytes]): Message to send over the ZeroMQ stream.
"""
raise NotImplementedError

Check warning on line 95 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L95

Added line #L95 was not covered by tests

async def _read_response(self) -> Optional[bytes]:
"""
Read and return a response once received on the socket.
Returns:
Optional[bytes]: If received, a response is returned, else None
"""
raise NotImplementedError

Check warning on line 104 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L104

Added line #L104 was not covered by tests

async def _process_response_queue(self) -> None:
"""Process response message queue from the ZeroMQ stream."""
running = True
while running:
resp = await self._read_response()
if resp is None:
continue
self._recv_message_queue.put_nowait(resp)
running = self.check_if_running()

Check warning on line 114 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L108-L114

Added lines #L108 - L114 were not covered by tests


class ZMQSubConnection(ZMQConnection):
def __init__(self, settings: ZMQConnectionSettings) -> None:
super().__init__(settings)

Check warning on line 119 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L119

Added line #L119 was not covered by tests

self._task_list = [

Check warning on line 121 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L121

Added line #L121 was not covered by tests
self._process_message_queue(),
]

async def start_stream(self) -> None:
await super().start_stream()

Check warning on line 126 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L126

Added line #L126 was not covered by tests

assert isinstance(self._socket.transport, aiozmq.ZmqTransport)

Check warning on line 128 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L128

Added line #L128 was not covered by tests
# Subscribe sockets require an extra option
self._socket.transport.setsockopt(zmq.SUBSCRIBE, b"")

Check warning on line 130 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L130

Added line #L130 was not covered by tests

async def _read_response(self) -> Optional[bytes]:
"""
Read and return a response once received on the socket.
Returns:
Optional[bytes]: If received, a response is returned, else None
"""
try:
resp = await asyncio.wait_for(self._socket.read(), timeout=20)
return resp[0]
except asyncio.TimeoutError:
pass
return None

Check warning on line 144 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L139-L144

Added lines #L139 - L144 were not covered by tests

async def _process_message(self, message: Iterable[bytes]) -> None:
"""Process message to send over the ZeroMQ stream.
Expand All @@ -136,11 +152,7 @@ async def _process_message(self, message: Iterable[bytes]) -> None:
if message is not None:
if not self._socket._closing:
try:
if self._type is not zmq.DEALER:
self._socket.write(message)
else:
self._socket._transport._zmq_sock.send(b"", flags=zmq.SNDMORE)
self._socket.write(message)
self._socket.write(message)
except zmq.error.ZMQError as e:
print("ZMQ Error", e)
await asyncio.sleep(1)
Expand All @@ -154,12 +166,63 @@ async def _process_message(self, message: Iterable[bytes]) -> None:
# No message was received
pass

Check warning on line 167 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L167

Added line #L167 was not covered by tests

async def _process_response_queue(self) -> None:
"""Process response message queue from the ZeroMQ stream."""
running = True
while running:
resp = await self._read_response()
if resp is None:
continue
self._recv_message_queue.put_nowait(resp)
running = self.check_if_running()

class ZMQDealerConnection(ZMQConnection):
def __init__(self, settings: ZMQConnectionSettings) -> None:
super().__init__(settings)
self._type = zmq.SocketType.DEALER

Check warning on line 173 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L172-L173

Added lines #L172 - L173 were not covered by tests

self._task_list = [

Check warning on line 175 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L175

Added line #L175 was not covered by tests
self._process_message_queue(),
self._process_response_queue(),
]

async def _read_response(self) -> Optional[bytes]:
"""
Read and return a response once received on the socket.
Returns:
Optional[bytes]: If received, a response is returned, else None
"""
discard = True
while discard:
try:
multipart_resp = await asyncio.wait_for(

Check warning on line 190 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L187-L190

Added lines #L187 - L190 were not covered by tests
self._socket.read(),
timeout=20,
)
if multipart_resp[0] == b"":
discard = False
resp = multipart_resp[1]
return resp
except asyncio.TimeoutError:
pass
return None

Check warning on line 200 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L194-L200

Added lines #L194 - L200 were not covered by tests

async def _process_message(self, message: Iterable[bytes]) -> None:
"""Process message to send over the ZeroMQ stream.
Args:
message (Iterable[bytes]): Message to send over the ZeroMQ stream.
"""
if message is not None:
if not self._socket._closing:
try:
assert isinstance(self._socket.transport, aiozmq.ZmqTransport)
self._socket._transport._zmq_sock.send(

Check warning on line 212 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L208-L212

Added lines #L208 - L212 were not covered by tests
b"",
flags=zmq.SNDMORE,
)
self._socket.write(message)
except zmq.error.ZMQError as e:
print("ZMQ Error", e)
await asyncio.sleep(1)
except Exception as e:
print(f"Error, {e}")
print("Unable to write to ZMQ stream, trying again...")
await asyncio.sleep(1)

Check warning on line 223 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L216-L223

Added lines #L216 - L223 were not covered by tests
else:
await asyncio.sleep(5)

Check warning on line 225 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L225

Added line #L225 was not covered by tests
else:
# No message was received
pass

Check warning on line 228 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L228

Added line #L228 was not covered by tests

0 comments on commit 266d871

Please sign in to comment.