Skip to content

Commit

Permalink
Some notificaitions things
Browse files Browse the repository at this point in the history
  • Loading branch information
coretl committed Nov 15, 2024
1 parent 24960b6 commit 9f047a9
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 60 deletions.
72 changes: 38 additions & 34 deletions src/catio/client.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
from __future__ import annotations

import asyncio
from typing import SupportsInt, TypeVar, Union
from collections import Counter
from typing import SupportsInt, TypeVar

import numpy as np
from py_ads_client import ADSSymbol
from py_ads_client.ams.ads_add_device_notification import (
ADSAddDeviceNotificationResponse,
)
from py_ads_client.ams.ads_delete_device_notification import (
ADSDeleteDeviceNotificationResponse,
)
from py_ads_client.ams.ads_device_notification import (
ADSDeviceNotificationResponse,
)
from py_ads_client.ams.ads_read import ADSReadResponse
from py_ads_client.ams.ads_read_device_info import ADSReadDeviceInfoResponse
from py_ads_client.ams.ads_read_state import ADSReadStateResponse
from py_ads_client.ams.ads_read_write import ADSReadWriteRequest, ADSReadWriteResponse
from py_ads_client.ams.ads_write import ADSWriteResponse
from py_ads_client.ams.ads_write_control import (
ADSWriteControlResponse,
)
from py_ads_client.ads_symbol import ADSSymbol
from py_ads_client.types import PLCData

from .messages import (
RESPONSE_CLASS,
ADSAddDeviceNotification,
ADSAddDeviceNotificationRequest,
ADSAddDeviceNotificationResponse,
AdsNotificationStream,
ADSReadWriteRequest,
ADSReadWriteResponse,
AMSHeader,
CommandId,
ErrorCode,
Expand Down Expand Up @@ -85,8 +73,11 @@ def __init__(
str, int
] = {} # key is variable name, value is handle
self.__device_notification_handles: dict[
int, ADSSymbol[PLCData]
SupportsInt, ADSSymbol[PLCData]
] = {} # key is handle
self.__buffer: bytes | None = None
self.__first_notification = b""
self.__task = asyncio.create_task(self._recv_forever())

@classmethod
async def connected_to(
Expand Down Expand Up @@ -125,14 +116,19 @@ async def _send_ams_message(
self.__response_events[self.__current_invoke_id] = ev
return ev

async def _recv_task(self):
async def _recv_forever(self):
while True:
header, body = await self._recv_ams_message()
assert header.error_code == ErrorCode.ERR_NOERROR, header.error_code
if header.command_id == CommandId.ADSSRVID_DEVICENOTE:
pass
if self.__buffer is not None:
if self.__first_notification:
assert len(body) == len(self.__first_notification)
else:
self.__first_notification = body
self.__buffer += body
else:
cls = RESPONSE_CLASS[header.command_id]
cls = RESPONSE_CLASS[CommandId(header.command_id)]
response = cls.from_bytes(body)
self.__response_events[header.invoke_id].set(response)

Expand All @@ -158,13 +154,13 @@ async def get_handle_by_name(self, name: str) -> int:
return handle

async def add_device_notification(
self, symbol: ADSSymbol, max_delay_ms: int = 0, cycle_time_ms: int = 0
) -> int:
self, symbol: ADSSymbol[PLCData], max_delay_ms: int = 0, cycle_time_ms: int = 0
) -> SupportsInt:
variable_handle = self.__variable_handles.get(symbol.name, None)
if variable_handle is None:
variable_handle = await self.get_handle_by_name(name=symbol.name)
self.__variable_handles[symbol.name] = variable_handle
request = ADSAddDeviceNotification(
request = ADSAddDeviceNotificationRequest(
index_group=IndexGroup.SYMVAL_BYHANDLE,
index_offset=variable_handle,
length=symbol.plc_t.bytes_length,
Expand All @@ -177,10 +173,18 @@ async def add_device_notification(
self.__device_notification_handles[response.handle] = symbol
return response.handle

async def get_notifications(self, n=1000):
lengths = set()
for _ in range(n):
response = await self._recv_ams_message()
assert isinstance(response, ADSDeviceNotificationResponse)
lengths.add(len(response.samples))
print(f"Got {n} notifications with {lengths} samples in each")
def start(self):
self.__buffer = self.__first_notification = b""

async def get_notifications(self):
buffer = self.__buffer
self.__buffer = b""
first_notification = AdsNotificationStream.from_bytes(self.__first_notification)
array = np.frombuffer(
buffer,
dtype=first_notification.get_notification_dtype(
self.__device_notification_handles
),
)
print(f"Got {len(array)} notifications with {array.dtype.fields}")
self.__notifications = []
101 changes: 81 additions & 20 deletions src/catio/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@
from typing import TYPE_CHECKING, Any, SupportsInt, get_type_hints

import numpy as np
from py_ads_client.ads_symbol import ADSSymbol
from py_ads_client.types import PLCData
from typing_extensions import Self, dataclass_transform

if TYPE_CHECKING:
UINT16 = SupportsInt
UINT32 = SupportsInt
UINT64 = SupportsInt
BYTES16 = bytes
NETID = Sequence[int]
else:
UINT16 = np.uint16
UINT32 = np.uint32
UINT64 = np.uint64
BYTES16 = np.dtype("S16")
NETID = np.dtype((np.uint8, 6))

Expand Down Expand Up @@ -51,20 +55,20 @@ def __init__(self, buffer: bytes = b"", *, data=b"", **kwargs):
self.data = data

def __getattr__(self, name: str) -> Any:
return self._value[name]
return self._value[name][0]

@cached_property
def dtype(self) -> np.dtype:
hints = get_type_hints(type(self))
hints.pop("data")
return np.dtype(list(hints))
return np.dtype(list(hints.items()))

@classmethod
def from_bytes(cls, buffer: bytes) -> Self:
return cls(buffer)

def to_bytes(self) -> bytes:
return self._value.tobytes()
return self._value.tobytes() + self.data


# AMS Header struct
Expand Down Expand Up @@ -428,7 +432,7 @@ class TransmissionMode(np.uint32, Enum):
# https://infosys.beckhoff.com/content/1033/tc3_ads_intro/115880971.html


class ADSAddDeviceNotification(Message):
class ADSAddDeviceNotificationRequest(Message):
index_group: IndexGroup
index_offset: UINT32
length: UINT32
Expand All @@ -438,27 +442,32 @@ class ADSAddDeviceNotification(Message):
reserved: BYTES16 = b""


class ADSAddDeviceNotificationResponse(Message):
result: ErrorCode
handle: UINT32


# Python Standard Encodings
# https://docs.python.org/3.8/library/codecs.html#standard-encodings

# A STRING constant is a string enclosed by single quotation marks.
# The characters are encoded according to the Windows 1252 character set.
# https://infosys.beckhoff.com/content/1033/tc3_plc_intro/2529327243.html

TWINCAT_STRING_ENCODING = "cp1252"

TWINCAT_WSTRING_ENCODING = "utf-16-le"

# ADS Read Write packet
# https://infosys.beckhoff.com/content/1033/tc3_grundlagen/115884043.html


class ADSReadWrite(Message):
class ADSReadWriteRequest(Message):
index_group: IndexGroup
index_offset: UINT32
read_length: UINT32
write_data: bytes

def to_bytes(self) -> bytes:
write_length = len(self.write_data)
format = f"< I I I I {write_length}s"
return struct.pack(
format,
self.index_group.value,
self.index_offset,
self.read_length,
write_length,
self.write_data,
)
write_length: UINT32
data: bytes

@classmethod
def get_handle_by_name(cls, name: str) -> Self:
Expand All @@ -467,8 +476,60 @@ def get_handle_by_name(cls, name: str) -> Self:
index_group=IndexGroup.GET_SYMHANDLE_BYNAME,
index_offset=0,
read_length=4,
write_data=data,
write_length=len(data),
data=data,
)


RESPONSE_CLASS: dict[CommandId, type[Message]] = {}
class ADSReadWriteResponse(Message):
result: ErrorCode
length: UINT32
data: bytes


# https://infosys.beckhoff.com/content/1033/tc3_grundlagen/115884043.html
class AdsNotificationStream(Message):
length: UINT32
stamps: UINT32
data: bytes

def get_notification_dtype(
self, symbols: dict[SupportsInt, ADSSymbol[PLCData]]
) -> np.dtype:
dtypes = [("_length", np.uint32), ("_stamps", np.uint32)]
assert self.stamps == 1, self.stamps
stamp_header = AdsStampHeader.from_bytes(self.data)
dtypes += [("_timestamp", np.uint64), ("_samples", np.uint32)]
data = stamp_header.data
for _ in range(int(stamp_header.samples)):
assert data, data
sample = AdsNotificationSample.from_bytes(data)
symbol = symbols[sample.handle]
assert symbol.plc_t.bytes_length == sample.size
# TODO: use numpy type from Symbol when we have it
dtypes += [
(f"_{symbol.name} handle", np.uint32),
(f"_{symbol.name} size", np.uint32),
(symbol.name, f"|u{sample.size}"),
]
data = data[8 + sample.size :]
assert data == b"", data
return np.dtype(dtypes)


class AdsStampHeader(Message):
timestamp: UINT64
samples: UINT32
data: bytes


class AdsNotificationSample(Message):
handle: UINT32
size: UINT32
data: bytes


RESPONSE_CLASS: dict[CommandId, type[Message]] = {
CommandId.ADSSRVID_READWRITE: ADSReadWriteResponse,
CommandId.ADSSRVID_ADDDEVICENOTE: ADSAddDeviceNotificationResponse,
}
10 changes: 8 additions & 2 deletions tests/test_catio.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

symbol1 = ADSSymbol(name="Term 31 (EL3702).Ch1 Sample 0", plc_t=INT)
symbol2 = ADSSymbol(name="Term 30 (EL3702).Ch1 Sample 0", plc_t=INT)
symbol3 = ADSSymbol(name="Term 31 (EL3702).Ch2 Sample 0", plc_t=INT)
symbol4 = ADSSymbol(name="Term 30 (EL3702).Ch2 Sample 0", plc_t=INT)


async def make_client():
Expand All @@ -21,9 +23,13 @@ async def run():
client1 = await make_client()
await client1.add_device_notification(symbol1)
await client1.add_device_notification(symbol2)
for _ in range(3):
await client1.add_device_notification(symbol3)
await client1.add_device_notification(symbol4)
await asyncio.sleep(0.1)
client1.start()
for _ in range(10):
await asyncio.sleep(1)
await client1.get_notifications(1000)
await client1.get_notifications()
# client2 = await make_client()
#
# for _ in range(3):
Expand Down
11 changes: 7 additions & 4 deletions tests/test_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
TransmissionMode as OldTransmissionMode,
)

from catio.messages import ADSAddDeviceNotification, IndexGroup, TransmissionMode
from catio.messages import ADSAddDeviceNotificationRequest, IndexGroup, TransmissionMode


def test_ads_add_device_notification():
Expand All @@ -19,10 +19,11 @@ def test_ads_add_device_notification():
cycle_time_ms=2,
transmission_mode=OldTransmissionMode.ADSTRANS_SERVERCYCLE,
)
new = ADSAddDeviceNotification(
new = ADSAddDeviceNotificationRequest(
index_group=IndexGroup.GET_SYMHANDLE_BYNAME,
index_offset=np.int32(3),
index_offset=6,
length=15,
max_delay_ms=34,
cycle_time_ms=2,
transmission_mode=TransmissionMode.ADSTRANS_SERVERCYCLE,
)
Expand All @@ -33,4 +34,6 @@ def test_ads_add_device_notification():
assert len(old.to_bytes()) == len(new.to_bytes())
assert old.to_bytes().hex(" ") == new.to_bytes().hex(" ")
serialized = new.to_bytes()
assert ADSAddDeviceNotification.from_bytes(serialized).to_bytes() == serialized
assert (
ADSAddDeviceNotificationRequest.from_bytes(serialized).to_bytes() == serialized
)

0 comments on commit 9f047a9

Please sign in to comment.