From 967a621c8e36303de42f438bb723756c31f0c208 Mon Sep 17 00:00:00 2001 From: Rune Monzel Date: Mon, 12 Jun 2023 08:52:55 +0200 Subject: [PATCH] added new property 'is_valid' to check if the mmap of the numpy array is valid or not - basically this can be queued to see if the ndsharray writer is already initialized or not --- ndsharray/__init__.py | 2 +- ndsharray/ndsharray.py | 142 ++++++++++++++++++++++++----------------- 2 files changed, 83 insertions(+), 61 deletions(-) diff --git a/ndsharray/__init__.py b/ndsharray/__init__.py index be5bfa1..cd904e4 100644 --- a/ndsharray/__init__.py +++ b/ndsharray/__init__.py @@ -5,7 +5,7 @@ __author__ = 'Rune Monzel' __email__ = 'runemonzel@googlemail.com' -__version__ = '1.0.1' +__version__ = '1.1.0' __all__ = ["NdShArray", "supported_types"] diff --git a/ndsharray/ndsharray.py b/ndsharray/ndsharray.py index a9af72e..3d94b75 100644 --- a/ndsharray/ndsharray.py +++ b/ndsharray/ndsharray.py @@ -116,6 +116,8 @@ def __init__(self, name: str, array: np.ndarray = np.ndarray((0, ), dtype=np.uin self._ndarray_mmap: Union[None, mmap.mmap] = None self._ndarray_fd: Union[None, int] = None + self._is_valid = False + # buffer size of the _mmap_ndarray _bytes = self._array_to_bytes(self._array) self._buffer_size: int = len(_bytes) @@ -124,7 +126,7 @@ def __init__(self, name: str, array: np.ndarray = np.ndarray((0, ), dtype=np.uin self._mmap, self._fd = self._create_mmap(self._name, len(self.ndarray_mmap_name), r_w=self._access) # create ndarray mmap - self._create_ndarray_mmap() + self._is_valid = self._create_ndarray_mmap() if self._access == "w": self.write(array) # call write to force saving the array via mmap.flush! @@ -147,6 +149,15 @@ def name(self) -> str: """ return self._name + @property + def is_valid(self) -> bool: + """ + checks if the header of the numpy array is valid or not + + :return: + """ + return self._is_valid + @property def ndarray_mmap_name(self) -> str: """ @@ -333,36 +344,37 @@ def read(self) -> Tuple[bool, np.ndarray]: self._create_ndarray_mmap() _recreated_map = True - # first stage of checking if new data have been arrived - self._ndarray_mmap.seek(0) - _bytes = self._ndarray_mmap.read(8) - try: - _write_time = struct.unpack("d", _bytes)[0] - except ValueError: - _write_time = 0 - if _write_time <= self._last_write_time and not _recreated_map: - return False, _numpy_array - - # without checking, read the whole buffer - _bytes += self._ndarray_mmap.read() - - if len(_bytes) != self._buffer_size: - self._create_ndarray_mmap() - - _mmap_correct, _validity, _numpy_array = self._bytes_to_array(_bytes) - if not _mmap_correct: - warnings.warn("The mmap of the ndarray seems to be corrupt and the used protocol does not fit.", - BytesWarning) - - # for efficiency - self._array = _numpy_array - # for debug purpose - self._read_time_ms = (time.monotonic()-_write_time) * 1000.0 - self._last_write_time = _write_time + if self._is_valid: + # first stage of checking if new data have been arrived + self._ndarray_mmap.seek(0) + _bytes = self._ndarray_mmap.read(8) + try: + _write_time = struct.unpack("d", _bytes)[0] + except ValueError: + _write_time = 0 + if _write_time <= self._last_write_time and not _recreated_map: + return False, _numpy_array + + # without checking, read the whole buffer + _bytes += self._ndarray_mmap.read() + + if len(_bytes) != self._buffer_size: + self._create_ndarray_mmap() + + _mmap_correct, _validity, _numpy_array = self._bytes_to_array(_bytes) + if not _mmap_correct: + warnings.warn("The mmap of the ndarray seems to be corrupt and the used protocol does not fit.", + BytesWarning) + + # for efficiency + self._array = _numpy_array + # for debug purpose + self._read_time_ms = (time.monotonic()-_write_time) * 1000.0 + self._last_write_time = _write_time return _validity, _numpy_array - def _create_ndarray_mmap(self) -> None: + def _create_ndarray_mmap(self) -> bool: """ creates two mmap: - the mmap with tag 'name' just holds the mmap-tag-name of ndarray @@ -386,38 +398,48 @@ def _create_ndarray_mmap(self) -> None: self._mmap.seek(0) _ndarray_mmap_name = bytes_to_str(self._mmap.read(len(self._name)+33)) self._uuid = _ndarray_mmap_name[-32:] - - # create temporary mmap to get the dtype and dimension of the array - _tmp_mmap, _tmp_fd = self._create_mmap(self.ndarray_mmap_name, 8+2*n_bytes_for_int, r_w="r") - _tmp_mmap.seek(0) - _bytes = _tmp_mmap.read(8+2*n_bytes_for_int) # skip the time: +8 - idx = 8 - _np_dtype = supported_types[bytes_to_int(_bytes[idx:idx+n_bytes_for_int])] - idx += n_bytes_for_int - _np_dim = bytes_to_int(_bytes[idx:idx+n_bytes_for_int]) - self._close_mmap(_tmp_mmap, _tmp_fd) - - # create temporary mmap to get the shape of the array - _tmp_2_mmap, _tmp_2_fd = self._create_mmap(self.ndarray_mmap_name, - 8 + 2 * n_bytes_for_int + _np_dim * n_bytes_for_int, - r_w="r") - _tmp_2_mmap.seek(8+2*n_bytes_for_int) # skip the time, dtype and dimension - # read shape - _bytes += _tmp_2_mmap.read(_np_dim * n_bytes_for_int) - idx = 8 + 2 * n_bytes_for_int - _np_shape = [] - for s in range(_np_dim): - _np_shape.append(bytes_to_int(_bytes[idx:idx + n_bytes_for_int])) - idx += n_bytes_for_int - _np_shape = tuple(_np_shape) - self._close_mmap(_tmp_2_mmap, _tmp_2_fd) - - # rebuild _array and get the length of the byte array -> super lazy and inefficient... - self._array = np.ndarray(_np_shape, dtype=_np_dtype) - self._buffer_size = len(self._array_to_bytes(self._array)) - - self._ndarray_mmap, self._ndarray_fd = self._create_mmap(self.ndarray_mmap_name, self._buffer_size, - r_w=self._access) + try: + int(self._uuid, 16) + self._is_valid = True + except ValueError: + self._is_valid = False + + try: + if self._is_valid: + # create temporary mmap to get the dtype and dimension of the array + _tmp_mmap, _tmp_fd = self._create_mmap(self.ndarray_mmap_name, 8+2*n_bytes_for_int, r_w="r") + _tmp_mmap.seek(0) + _bytes = _tmp_mmap.read(8+2*n_bytes_for_int) # skip the time: +8 + idx = 8 + _np_dtype = supported_types[bytes_to_int(_bytes[idx:idx+n_bytes_for_int])] + idx += n_bytes_for_int + _np_dim = bytes_to_int(_bytes[idx:idx+n_bytes_for_int]) + self._close_mmap(_tmp_mmap, _tmp_fd) + + # create temporary mmap to get the shape of the array + _tmp_2_mmap, _tmp_2_fd = self._create_mmap(self.ndarray_mmap_name, + 8 + 2 * n_bytes_for_int + _np_dim * n_bytes_for_int, + r_w="r") + _tmp_2_mmap.seek(8+2*n_bytes_for_int) # skip the time, dtype and dimension + # read shape + _bytes += _tmp_2_mmap.read(_np_dim * n_bytes_for_int) + idx = 8 + 2 * n_bytes_for_int + _np_shape = [] + for s in range(_np_dim): + _np_shape.append(bytes_to_int(_bytes[idx:idx + n_bytes_for_int])) + idx += n_bytes_for_int + _np_shape = tuple(_np_shape) + self._close_mmap(_tmp_2_mmap, _tmp_2_fd) + + # rebuild _array and get the length of the byte array -> super lazy and inefficient... + self._array = np.ndarray(_np_shape, dtype=_np_dtype) + self._buffer_size = len(self._array_to_bytes(self._array)) + + self._ndarray_mmap, self._ndarray_fd = self._create_mmap(self.ndarray_mmap_name, self._buffer_size, + r_w=self._access) + except: + self.is_valid = False + return self._is_valid @staticmethod def _create_mmap(name: str, buffer_size: int, r_w: str) -> Tuple[mmap.mmap, Union[None, int]]: