Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added serial_connection support to connections #24

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/fastcs/connections/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .ip_connection import IPConnection
from .serial_connection import SerialConnection

__all__ = ["IPConnection", "SerialConnection"]
File renamed without changes.
109 changes: 109 additions & 0 deletions src/fastcs/connections/serial_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import asyncio
import socket
import warnings
from dataclasses import dataclass
from typing import Optional, Tuple

# Constants
TIMEOUT = 1.0 # Seconds
RECV_BUFFER = 4096 # Bytes


@dataclass
class SerialConnectionSettings:
ip: str = "127.0.0.1"
port: int = 7001


class SerialConnection:
def __init__(self) -> None:
# self._endpoint: Tuple[str, int] =
self._socket: socket.socket = socket.socket(

Check warning on line 21 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L21

Added line #L21 was not covered by tests
socket.AF_INET,
socket.SOCK_STREAM,
)
self._socket.settimeout(TIMEOUT)
self._lock = asyncio.Lock()

Check warning on line 26 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L25-L26

Added lines #L25 - L26 were not covered by tests

def connect(self, settings: SerialConnectionSettings):
self._endpoint: Tuple[str, int] = (settings.ip, settings.port)
self._socket.connect(self._endpoint)

Check warning on line 30 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L29-L30

Added lines #L29 - L30 were not covered by tests
# Clear initial connection messages
# TODO: Are these useful? Use as confirmation of connection?
# self._clear_socket()
self.clear_socket()

Check warning on line 34 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L34

Added line #L34 was not covered by tests

def disconnect(self):
self._socket.close()

Check warning on line 37 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L37

Added line #L37 was not covered by tests

def clear_socket(self):
"""Read from socket until we timeout"""
while True:
try:
self._socket.recv(RECV_BUFFER)
except socket.timeout:
break

Check warning on line 45 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L41-L45

Added lines #L41 - L45 were not covered by tests

@staticmethod
def _format_message(message: bytes) -> bytes:
"""Format message for printing by appending a newline char.

Args:
message: The message to format.

Returns:
The formatted message.
"""
return message + b"\n"

Check warning on line 57 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L57

Added line #L57 was not covered by tests

def _send(self, request: bytes):
"""Send a request.

Args:
request: The request string to send.
"""
self._socket.send(self._format_message(request))

Check warning on line 65 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L65

Added line #L65 was not covered by tests

async def _send_receive(self, request: bytes) -> Optional[bytes]:
"""Sends a request and attempts to decode the response.

Does not determine if the response indicates acknowledgement
from the device.

Args:
request: The request string to send.

Returns:
If the response could be decoded,
then it is returned. Otherwise None is returned.
"""
async with self._lock:
self._send(request)

Check warning on line 81 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L80-L81

Added lines #L80 - L81 were not covered by tests

if request.endswith(b"?"):
try:
response = self._socket.recv(RECV_BUFFER)
return response
except UnicodeDecodeError as e:
warnings.warn(f"{e}:\n{self._format_message(response).decode()}")
except socket.timeout:
warnings.warn("Didn't receive a response in time.")

Check warning on line 90 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L83-L90

Added lines #L83 - L90 were not covered by tests

return None

Check warning on line 92 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L92

Added line #L92 was not covered by tests

async def send_receive(self, request: bytes) -> Optional[bytes]:
"""Sends a request and attempts to decode the response.

Args:
request: The request string to send.

Returns:
The decoded response string if the
request was successful, otherwise None is returned.
"""

response = await self._send_receive(request)
if response is None:
return None

Check warning on line 107 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L105-L107

Added lines #L105 - L107 were not covered by tests

return response

Check warning on line 109 in src/fastcs/connections/serial_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/serial_connection.py#L109

Added line #L109 was not covered by tests
Loading