Skip to content

Commit

Permalink
Move attributes into ZMQConnectionSettings
Browse files Browse the repository at this point in the history
Renamed class attributes
  • Loading branch information
OCopping committed Mar 6, 2024
1 parent 9210c5c commit 6843952
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions src/fastcs/connections/zmq_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@
import aiozmq
import zmq


@dataclass
class ZMQConnectionSettings:
_host: str = "127.0.0.1"
_port: int = 5555
_type: int = zmq.DEALER


class ZMQConnection:
"""An adapter for a ZeroMQ data stream."""

zmq_host: str = "127.0.0.1"
zmq_port: int = 5555
zmq_type: int = zmq.DEALER
running: bool = False
def __init__(self, settings: ZMQConnectionSettings) -> None:
self._host, self._port, self._type = (settings._host, settings._port, settings._type)
self.running: bool = False
self._lock = asyncio.Lock()

Check warning on line 23 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L21-L23

Added lines #L21 - L23 were not covered by tests

def get_setup(self) -> None:
"""Print out the current configuration."""
Expand All @@ -33,9 +38,9 @@ async def start_stream(self) -> None:
print("starting stream...")

Check warning on line 38 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L38

Added line #L38 was not covered by tests

self._socket = await aiozmq.create_zmq_stream(

Check warning on line 40 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L40

Added line #L40 was not covered by tests
self.zmq_type, connect=f"tcp://{self.zmq_host}:{self.zmq_port}"
self._type, connect=f"tcp://{self._host}:{self._port}"
) # type: ignore
if self.zmq_type == zmq.SUB:
if self._type == zmq.SUB:
self._socket.transport.setsockopt(zmq.SUBSCRIBE, b"")
self._socket.transport.setsockopt(zmq.LINGER, 0)

Check warning on line 45 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L43-L45

Added lines #L43 - L45 were not covered by tests

Expand Down Expand Up @@ -66,7 +71,7 @@ async def _read_response(self) -> Optional[bytes]:
Returns:
Optional[bytes]: If received, a response is returned, else None
"""
if self.zmq_type is not zmq.DEALER:
if self._type is not zmq.DEALER:
try:
resp = await asyncio.wait_for(self._socket.read(), timeout=20)
return resp[0]
Expand Down Expand Up @@ -109,14 +114,14 @@ async def run_forever(self) -> None:

self.running = True

Check warning on line 115 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L115

Added line #L115 was not covered by tests

if self.zmq_type == zmq.DEALER:
if self._type == zmq.DEALER:
await asyncio.gather(

Check warning on line 118 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L117-L118

Added lines #L117 - L118 were not covered by tests
*[
self._process_message_queue(),
self._process_response_queue(),
]
)
elif self.zmq_type == zmq.SUB:
elif self._type == zmq.SUB:
await asyncio.gather(

Check warning on line 125 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L124-L125

Added lines #L124 - L125 were not covered by tests
*[
self._process_response_queue(),
Expand Down Expand Up @@ -145,7 +150,7 @@ async def _process_message(self, message: Iterable[bytes]) -> None:
if message is not None:
if not self._socket._closing:
try:
if self.zmq_type is not zmq.DEALER:
if self._type is not zmq.DEALER:
self._socket.write(message)

Check warning on line 154 in src/fastcs/connections/zmq_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/zmq_connection.py#L150-L154

Added lines #L150 - L154 were not covered by tests
else:
self._socket._transport._zmq_sock.send(b"", flags=zmq.SNDMORE)
Expand Down

0 comments on commit 6843952

Please sign in to comment.