diff --git a/python/selfie-lib/selfie_lib/ConvertToWindowsNewlines.py b/python/selfie-lib/selfie_lib/ConvertToWindowsNewlines.py new file mode 100644 index 00000000..5718a2b9 --- /dev/null +++ b/python/selfie-lib/selfie_lib/ConvertToWindowsNewlines.py @@ -0,0 +1,20 @@ +class ConvertToWindowsNewlines: + def __init__(self, sink): + self.sink = sink + + def append(self, value, start_index=None, end_index=None): + # If value is a single character + if isinstance(value, str) and len(value) == 1: + if value != "\n": + self.sink.write(value) + else: + self.sink.write("\r\n") + # If value is a CharSequence (in Python, a str) + elif isinstance(value, str): + # If start_index and end_index are provided, use the slice of the string + if start_index is not None and end_index is not None: + value_to_append = value[start_index:end_index] + else: + value_to_append = value + self.sink.write(value_to_append.replace("\n", "\r\n")) + return self diff --git a/python/selfie-lib/selfie_lib/Snapshot.py b/python/selfie-lib/selfie_lib/Snapshot.py new file mode 100644 index 00000000..9d22057a --- /dev/null +++ b/python/selfie-lib/selfie_lib/Snapshot.py @@ -0,0 +1,94 @@ +from .SnapshotValue import SnapshotValue +from collections import OrderedDict + + +class Snapshot: + def __init__(self, subject, facet_data): + self._subject = subject + self._facet_data = facet_data + + @property + def facets(self): + return OrderedDict(sorted(self._facet_data.items())) + + def __eq__(self, other): + if not isinstance(other, Snapshot): + return NotImplemented + return self._subject == other._subject and self._facet_data == other._facet_data + + def __hash__(self): + return hash((self._subject, frozenset(self._facet_data.items()))) + + def plus_facet(self, key, value): + if isinstance(value, bytes): + value = SnapshotValue.of(value) + elif isinstance(value, str): + value = SnapshotValue.of(value) + return self._plus_facet(key, value) + + def _plus_facet(self, key, value): + if not key: + raise ValueError("The empty string is reserved for the subject.") + facet_data = dict(self._facet_data) + facet_data[self._unix_newlines(key)] = value + return Snapshot(self._subject, facet_data) + + def plus_or_replace(self, key, value): + if not key: + return Snapshot(value, self._facet_data) + facet_data = dict(self._facet_data) + facet_data[self._unix_newlines(key)] = value + return Snapshot(self._subject, facet_data) + + def subject_or_facet_maybe(self, key): + if not key: + return self._subject + return self._facet_data.get(key) + + def subject_or_facet(self, key): + value = self.subject_or_facet_maybe(key) + if value is None: + raise KeyError(f"'{key}' not found in {list(self._facet_data.keys())}") + return value + + def all_entries(self): + entries = [("", self._subject)] + entries.extend(self._facet_data.items()) + return entries + + def __bytes__(self): + return f"[{self._subject} {self._facet_data}]" + + @staticmethod + def of(data): + if isinstance(data, bytes): + # Handling binary data + return Snapshot(SnapshotValue.of(data), {}) + elif isinstance(data, str): + # Handling string data + return Snapshot(SnapshotValue.of(data), {}) + elif isinstance(data, SnapshotValue): + return Snapshot(data, {}) + else: + raise TypeError("Data must be either binary or string" + data) + + @staticmethod + def of_entries(entries): + subject = None + facet_data = {} + for key, value in entries: + if not key: + if subject is not None: + raise ValueError( + f"Duplicate root snapshot.\n first: {subject}\nsecond: {value}" + ) + subject = value + else: + facet_data[key] = value + if subject is None: + subject = SnapshotValue.of("") + return Snapshot(subject, facet_data) + + @staticmethod + def _unix_newlines(string): + return string.replace("\\r\\n", "\\n") diff --git a/python/selfie-lib/selfie_lib/SnapshotReader.py b/python/selfie-lib/selfie_lib/SnapshotReader.py new file mode 100644 index 00000000..4271dac0 --- /dev/null +++ b/python/selfie-lib/selfie_lib/SnapshotReader.py @@ -0,0 +1,48 @@ +from .Snapshot import Snapshot + + +class SnapshotReader: + def __init__(self, value_reader): + self.value_reader = value_reader + + def peek_key(self): + next_key = self.value_reader.peek_key() + if next_key is None or next_key == "[end of file]": + return None + if "[" in next_key: + raise ValueError( + f"Missing root snapshot, square brackets not allowed: '{next_key}'" + ) + return next_key + + def next_snapshot(self): + root_name = self.peek_key() + snapshot = Snapshot.of(self.value_reader.next_value()) + while True: + next_key = self.value_reader.peek_key() + if next_key is None: + return snapshot + facet_idx = next_key.find("[") + if facet_idx == -1 or (facet_idx == 0 and next_key == "[end of file]"): + return snapshot + facet_root = next_key[:facet_idx] + if facet_root != root_name: + raise ValueError( + f"Expected '{next_key}' to come after '{facet_root}', not '{root_name}'" + ) + facet_end_idx = next_key.find("]", facet_idx + 1) + if facet_end_idx == -1: + raise ValueError(f"Missing ] in {next_key}") + facet_name = next_key[facet_idx + 1 : facet_end_idx] + snapshot = snapshot.plus_facet(facet_name, self.value_reader.next_value()) + + def skip_snapshot(self): + root_name = self.peek_key() + if root_name is None: + raise ValueError("No snapshot to skip") + self.value_reader.skip_value() + while True: + next_key = self.peek_key() + if next_key is None or not next_key.startswith(f"{root_name}["): + break + self.value_reader.skip_value() diff --git a/python/selfie-lib/selfie_lib/SnapshotValue.py b/python/selfie-lib/selfie_lib/SnapshotValue.py new file mode 100644 index 00000000..95f72df7 --- /dev/null +++ b/python/selfie-lib/selfie_lib/SnapshotValue.py @@ -0,0 +1,68 @@ +from abc import ABC, abstractmethod + + +def unix_newlines(string: str) -> str: + return string.replace("\r\n", "\n") + + +class SnapshotValue(ABC): + @property + def is_binary(self) -> bool: + return isinstance(self, SnapshotValueBinary) + + @abstractmethod + def value_binary(self) -> bytes: + pass + + @abstractmethod + def value_string(self) -> str: + pass + + @staticmethod + def of(data): + if isinstance(data, bytes): + return SnapshotValueBinary(data) + elif isinstance(data, str): + return SnapshotValueString(data) + elif isinstance(data, SnapshotValue): + return data + else: + raise TypeError("Unsupported type for Snapshot creation") + + +class SnapshotValueBinary(SnapshotValue): + def __init__(self, value: bytes): + self._value = value + + def value_binary(self) -> bytes: + return self._value + + def value_string(self) -> str: + raise NotImplementedError("This is a binary value.") + + def __eq__(self, other): + if isinstance(other, SnapshotValueBinary): + return self.value_binary() == other.value_binary() + return False + + def __hash__(self): + return hash(self._value) + + +class SnapshotValueString(SnapshotValue): + def __init__(self, value: str): + self._value = value + + def value_binary(self) -> bytes: + raise NotImplementedError("This is a string value.") + + def value_string(self) -> str: + return self._value + + def __eq__(self, other): + if isinstance(other, SnapshotValueString): + return self.value_string() == other.value_string() + return False + + def __hash__(self): + return hash(self._value) diff --git a/python/selfie-lib/selfie_lib/SnapshotValueReader.py b/python/selfie-lib/selfie_lib/SnapshotValueReader.py index 11d1735e..4f937619 100644 --- a/python/selfie-lib/selfie_lib/SnapshotValueReader.py +++ b/python/selfie-lib/selfie_lib/SnapshotValueReader.py @@ -1,61 +1,14 @@ import base64 - -from abc import ABC, abstractmethod -from typing import Union from .PerCharacterEscaper import PerCharacterEscaper from .ParseException import ParseException from .LineReader import LineReader +from .SnapshotValue import SnapshotValue def unix_newlines(string: str) -> str: return string.replace("\r\n", "\n") -class SnapshotValue(ABC): - @property - def is_binary(self) -> bool: - return isinstance(self, SnapshotValueBinary) - - @abstractmethod - def value_binary(self) -> bytes: - pass - - @abstractmethod - def value_string(self) -> str: - pass - - @staticmethod - def of(value: Union[bytes, str]) -> "SnapshotValue": - if isinstance(value, bytes): - return SnapshotValueBinary(value) - elif isinstance(value, str): - return SnapshotValueString(unix_newlines(value)) - else: - raise TypeError("Value must be either bytes or str") - - -class SnapshotValueBinary(SnapshotValue): - def __init__(self, value: bytes): - self._value = value - - def value_binary(self) -> bytes: - return self._value - - def value_string(self) -> str: - raise NotImplementedError("This is a binary value.") - - -class SnapshotValueString(SnapshotValue): - def __init__(self, value: str): - self._value = value - - def value_binary(self) -> bytes: - raise NotImplementedError("This is a string value.") - - def value_string(self) -> str: - return self._value - - class SnapshotValueReader: KEY_FIRST_CHAR = "╔" KEY_START = "╔═ " diff --git a/python/selfie-lib/selfie_lib/__init__.py b/python/selfie-lib/selfie_lib/__init__.py index 87b47f7a..3204174f 100644 --- a/python/selfie-lib/selfie_lib/__init__.py +++ b/python/selfie-lib/selfie_lib/__init__.py @@ -4,3 +4,6 @@ from .PerCharacterEscaper import PerCharacterEscaper as PerCharacterEscaper from .SnapshotValueReader import SnapshotValueReader as SnapshotValueReader from .ParseException import ParseException as ParseException +from .SnapshotReader import SnapshotReader as SnapshotReader +from .Snapshot import Snapshot as Snapshot +from .SnapshotValue import SnapshotValue as SnapshotValue diff --git a/python/selfie-lib/tests/SnapshotReader_test.py b/python/selfie-lib/tests/SnapshotReader_test.py new file mode 100644 index 00000000..236c6a2e --- /dev/null +++ b/python/selfie-lib/tests/SnapshotReader_test.py @@ -0,0 +1,58 @@ +from base64 import b64decode +from selfie_lib import SnapshotValueReader, Snapshot, SnapshotReader + + +class TestSnapshotReader: + def test_facet(self): + reader = SnapshotReader( + SnapshotValueReader.of( + """ +╔═ Apple ═╗ +Apple +╔═ Apple[color] ═╗ +green +╔═ Apple[crisp] ═╗ +yes +╔═ Orange ═╗ +Orange +""".strip() + ) + ) + assert reader.peek_key() == "Apple" + assert reader.peek_key() == "Apple" + apple_snapshot = ( + Snapshot.of("Apple").plus_facet("color", "green").plus_facet("crisp", "yes") + ) + assert reader.next_snapshot() == apple_snapshot + assert reader.peek_key() == "Orange" + assert reader.peek_key() == "Orange" + assert reader.next_snapshot() == Snapshot.of("Orange") + assert reader.peek_key() is None + + def test_binary(self): + reader = SnapshotReader( + SnapshotValueReader.of( + """ +╔═ Apple ═╗ +Apple +╔═ Apple[color] ═╗ base64 length 3 bytes +c2Fk +╔═ Apple[crisp] ═╗ +yes +╔═ Orange ═╗ base64 length 3 bytes +c2Fk +""".strip() + ) + ) + assert reader.peek_key() == "Apple" + assert reader.peek_key() == "Apple" + apple_snapshot = ( + Snapshot.of("Apple") + .plus_facet("color", b64decode("c2Fk")) + .plus_facet("crisp", "yes") + ) + assert reader.next_snapshot() == apple_snapshot + assert reader.peek_key() == "Orange" + assert reader.peek_key() == "Orange" + assert reader.next_snapshot() == Snapshot.of(b64decode("c2Fk")) + assert reader.peek_key() is None