From dfe84e67f1fbf7915e492b715dbf8ed51ce3fcf4 Mon Sep 17 00:00:00 2001 From: Snuffy2 Date: Wed, 8 Jan 2025 19:44:58 -0500 Subject: [PATCH] Improve get_attr typing --- custom_components/places/sensor.py | 558 ++++++++++++++++++++--------- 1 file changed, 396 insertions(+), 162 deletions(-) diff --git a/custom_components/places/sensor.py b/custom_components/places/sensor.py index 40f8fa3..5d96b44 100644 --- a/custom_components/places/sensor.py +++ b/custom_components/places/sensor.py @@ -257,8 +257,7 @@ def _is_float(value: Any) -> bool: return False else: return True - else: - return False + return False class Places(SensorEntity): @@ -330,7 +329,7 @@ def __init__( if not self._is_attr_blank(CONF_LANGUAGE): self._set_attr( CONF_LANGUAGE, - self._get_attr(CONF_LANGUAGE).replace(" ", "").strip(), + self._get_attr_safe_str(CONF_LANGUAGE).replace(" ", "").strip(), ) self._set_attr( CONF_EXTENDED_ATTR, @@ -515,11 +514,38 @@ def _is_attr_blank(self, attr: str) -> bool: return False return True - def _get_attr(self, attr: str | None, default: Any | None = None): # None | Any: + def _get_attr(self, attr: str | None, default: Any | None = None) -> None | Any: if attr is None or (default is None and self._is_attr_blank(attr)): return None return self._internal_attr.get(attr, default) + def _get_attr_safe_str(self, attr: str | None, default: Any | None = None) -> str: + value = self._get_attr(attr=attr, default=default) + if value is not None: + try: + return str(value) + except ValueError: + return "" + return "" + + def _get_attr_safe_float(self, attr: str | None, default: Any | None = None) -> float: + value = self._get_attr(attr=attr, default=default) + if not isinstance(value, float): + return 0 + return value + + def _get_attr_safe_list(self, attr: str | None, default: Any | None = None) -> list: + value = self._get_attr(attr=attr, default=default) + if not isinstance(value, list): + return [] + return value + + def _get_attr_safe_dict(self, attr: str | None, default: Any | None = None) -> MutableMapping: + value = self._get_attr(attr=attr, default=default) + if not isinstance(value, MutableMapping): + return {} + return value + def _set_attr(self, attr: str, value: Any | None = None) -> None: if attr: self._internal_attr.update({attr: value}) @@ -646,18 +672,22 @@ async def _async_clear_since_from_state(orig_state: str) -> str: async def _async_in_zone(self) -> bool: if not self._is_attr_blank(ATTR_DEVICETRACKER_ZONE): zone_state = self._hass.states.get( - f"{CONF_ZONE}.{(self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower()}" + f"{CONF_ZONE}.{self._get_attr_safe_str(ATTR_DEVICETRACKER_ZONE).lower()}" ) if ( - (self._get_attr(CONF_DEVICETRACKER_ID)).split(".")[0] == CONF_ZONE + self._get_attr_safe_str(CONF_DEVICETRACKER_ID).split(".")[0] == CONF_ZONE or ( - "stationary" in (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() - or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower().startswith("statzon") - or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower().startswith("ic3_statzone_") - or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() == "away" - or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() == "not_home" - or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() == "notset" - or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() == "not_set" + "stationary" in self._get_attr_safe_str(ATTR_DEVICETRACKER_ZONE).lower() + or self._get_attr_safe_str(ATTR_DEVICETRACKER_ZONE) + .lower() + .startswith("statzon") + or self._get_attr_safe_str(ATTR_DEVICETRACKER_ZONE) + .lower() + .startswith("ic3_statzone_") + or self._get_attr_safe_str(ATTR_DEVICETRACKER_ZONE).lower() == "away" + or self._get_attr_safe_str(ATTR_DEVICETRACKER_ZONE).lower() == "not_home" + or self._get_attr_safe_str(ATTR_DEVICETRACKER_ZONE).lower() == "notset" + or self._get_attr_safe_str(ATTR_DEVICETRACKER_ZONE).lower() == "not_set" ) or ( zone_state is not None @@ -669,7 +699,8 @@ async def _async_in_zone(self) -> bool: return False async def _async_cleanup_attributes(self) -> None: - for attr in self._internal_attr: + attrs: MutableMapping[str, Any] = copy.deepcopy(self._internal_attr) + for attr in attrs: if self._is_attr_blank(attr): self._clear_attr(attr) @@ -711,7 +742,7 @@ async def _async_check_for_updated_entity_name(self) -> None: ) async def _async_get_zone_details(self) -> None: - if (self._get_attr(CONF_DEVICETRACKER_ID)).split(".")[0] != CONF_ZONE: + if self._get_attr_safe_str(CONF_DEVICETRACKER_ID).split(".")[0] != CONF_ZONE: self._set_attr( ATTR_DEVICETRACKER_ZONE, self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).state, @@ -741,11 +772,11 @@ async def _async_get_zone_details(self) -> None: ) if not self._is_attr_blank(ATTR_DEVICETRACKER_ZONE_NAME) and ( - self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME) + self._get_attr_safe_str(ATTR_DEVICETRACKER_ZONE_NAME) ).lower() == self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME): self._set_attr( ATTR_DEVICETRACKER_ZONE_NAME, - (self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME)).title(), + self._get_attr_safe_str(ATTR_DEVICETRACKER_ZONE_NAME).title(), ) _LOGGER.debug( "(%s) Tracked Entity Zone Name: %s", @@ -774,7 +805,7 @@ async def _async_determine_if_update_needed(self) -> int: if self._is_attr_blank(ATTR_NATIVE_VALUE) or ( isinstance(self._get_attr(ATTR_NATIVE_VALUE), str) - and (self._get_attr(ATTR_NATIVE_VALUE)).lower() + and self._get_attr_safe_str(ATTR_NATIVE_VALUE).lower() in {"none", STATE_UNKNOWN, STATE_UNAVAILABLE} ): _LOGGER.info( @@ -790,12 +821,12 @@ async def _async_determine_if_update_needed(self) -> int: ) return 2 # 0: False. 1: True. 2: False, but set direction of travel to stationary - if int(self._get_attr(ATTR_DISTANCE_TRAVELED_M)) < 10: + if int(self._get_attr_safe_float(ATTR_DISTANCE_TRAVELED_M)) < 10: _LOGGER.info( "(%s) " "Not performing update, distance traveled from last update is less than 10 m (%s m)", self._get_attr(CONF_NAME), - round(self._get_attr(ATTR_DISTANCE_TRAVELED_M), 1), + round(self._get_attr_safe_float(ATTR_DISTANCE_TRAVELED_M), 1), ) return 2 # 0: False. 1: True. 2: False, but set direction of travel to stationary @@ -809,7 +840,7 @@ def _get_dict_from_url(self, url: str, name: str, dict_name: str) -> None: self._set_attr(dict_name, {}) headers: dict[str, str] = {"user-agent": f"Mozilla/5.0 (Home Assistant) {DOMAIN}/{VERSION}"} try: - get_response: requests.Response = requests.get(url, headers=headers) + get_response: requests.Response | None = requests.get(url, headers=headers) except requests.exceptions.RetryError as e: get_response = None _LOGGER.warning( @@ -908,7 +939,7 @@ def _get_dict_from_url(self, url: str, name: str, dict_name: str) -> None: if ( isinstance(get_dict, list) and len(get_dict) == 1 - and isinstance(get_dict[0], MutableMapping()) + and isinstance(get_dict[0], MutableMapping) ): self._set_attr(dict_name, get_dict[0]) return @@ -935,8 +966,8 @@ async def _async_get_map_link(self) -> None: f"{self._get_attr(ATTR_LATITUDE)}" f"&mlon={self._get_attr(ATTR_LONGITUDE)}" f"#map={self._get_attr(CONF_MAP_ZOOM)}/" - f"{str(self._get_attr(ATTR_LATITUDE))[:8]}/" - f"{str(self._get_attr(ATTR_LONGITUDE))[:9]}" + f"{self._get_attr_safe_str(ATTR_LATITUDE)[:8]}/" + f"{self._get_attr_safe_str(ATTR_LONGITUDE)[:9]}" ), ) else: @@ -999,7 +1030,7 @@ async def _async_get_gps_accuracy(self) -> int: _LOGGER.debug( "(%s) GPS Accuracy: %s", self._get_attr(CONF_NAME), - round(self._get_attr(ATTR_GPS_ACCURACY), 3), + round(self._get_attr_safe_float(ATTR_GPS_ACCURACY), 3), ) return proceed_with_update @@ -1075,7 +1106,7 @@ async def _parse_namedetails(self, osm_dict: MutableMapping[str, Any]) -> None: namedetails.get("name"), ) if not self._is_attr_blank(CONF_LANGUAGE): - for language in (self._get_attr(CONF_LANGUAGE)).split(","): + for language in self._get_attr_safe_str(CONF_LANGUAGE).split(","): if f"name:{language}" in namedetails: self._set_attr( ATTR_PLACE_NAME, @@ -1114,7 +1145,7 @@ async def _set_address_details(self, address: MutableMapping[str, Any]) -> None: ): self._set_attr( ATTR_PLACE_NAME, - self._get_attr(ATTR_OSM_DICT).get("address").get("retail"), + self._get_attr_safe_dict(ATTR_OSM_DICT).get("address", {}).get("retail"), ) _LOGGER.debug( "(%s) Place Name: %s", self._get_attr(CONF_NAME), self._get_attr(ATTR_PLACE_NAME) @@ -1148,7 +1179,6 @@ async def _set_city_details(self, address: MutableMapping[str, Any]) -> None: "quarter", "neighbourhood", ] - _LOGGER.debug("CITY_LIST: %s", CITY_LIST) for city_type in CITY_LIST: with contextlib.suppress(ValueError): POSTAL_TOWN_LIST.remove(city_type) @@ -1161,7 +1191,6 @@ async def _set_city_details(self, address: MutableMapping[str, Any]) -> None: address.get(city_type), ) break - _LOGGER.debug("POSTAL_TOWN_LIST: %s", POSTAL_TOWN_LIST) for postal_town_type in POSTAL_TOWN_LIST: with contextlib.suppress(ValueError): NEIGHBOURHOOD_LIST.remove(postal_town_type) @@ -1171,7 +1200,6 @@ async def _set_city_details(self, address: MutableMapping[str, Any]) -> None: address.get(postal_town_type), ) break - _LOGGER.debug("NEIGHBOURHOOD_LIST: %s", NEIGHBOURHOOD_LIST) for neighbourhood_type in NEIGHBOURHOOD_LIST: if neighbourhood_type in address: self._set_attr( @@ -1183,12 +1211,12 @@ async def _set_city_details(self, address: MutableMapping[str, Any]) -> None: if not self._is_attr_blank(ATTR_CITY): self._set_attr( ATTR_CITY_CLEAN, - (self._get_attr(ATTR_CITY)).replace(" Township", "").strip(), + self._get_attr_safe_str(ATTR_CITY).replace(" Township", "").strip(), ) - if (self._get_attr(ATTR_CITY_CLEAN)).startswith("City of"): + if self._get_attr_safe_str(ATTR_CITY_CLEAN).startswith("City of"): self._set_attr( ATTR_CITY_CLEAN, - f"{self._get_attr(ATTR_CITY_CLEAN)[8:]} City", + f"{self._get_attr_safe_str(ATTR_CITY_CLEAN)[8:]} City", ) async def _set_region_details(self, address: MutableMapping[str, Any]) -> None: @@ -1220,7 +1248,7 @@ async def _set_region_details(self, address: MutableMapping[str, Any]) -> None: if "postcode" in address: self._set_attr( ATTR_POSTAL_CODE, - self._get_attr(ATTR_OSM_DICT).get("address").get("postcode"), + self._get_attr_safe_dict(ATTR_OSM_DICT).get("address", {}).get("postcode"), ) async def _parse_miscellaneous(self, osm_dict: MutableMapping[str, Any]) -> None: @@ -1233,7 +1261,7 @@ async def _parse_miscellaneous(self, osm_dict: MutableMapping[str, Any]) -> None if "osm_id" in osm_dict: self._set_attr( ATTR_OSM_ID, - str(self._get_attr(ATTR_OSM_DICT).get("osm_id")), + str(self._get_attr_safe_dict(ATTR_OSM_DICT).get("osm_id", "")), ) if "osm_type" in osm_dict: self._set_attr( @@ -1243,7 +1271,7 @@ async def _parse_miscellaneous(self, osm_dict: MutableMapping[str, Any]) -> None if ( not self._is_attr_blank(ATTR_PLACE_CATEGORY) - and (self._get_attr(ATTR_PLACE_CATEGORY)).lower() == "highway" + and self._get_attr_safe_str(ATTR_PLACE_CATEGORY).lower() == "highway" and "namedetails" in osm_dict and osm_dict.get("namedetails") is not None and "ref" in osm_dict["namedetails"] @@ -1270,7 +1298,7 @@ async def _set_place_name_no_dupe(self) -> None: dupe_attributes_check: list[str] = [] dupe_attributes_check.extend( [ - self._get_attr(attr) + self._get_attr_safe_str(attr) for attr in PLACE_NAME_DUPLICATE_LIST if not self._is_attr_blank(attr) ] @@ -1291,15 +1319,15 @@ async def _async_build_formatted_place(self) -> None: formatted_place_array: list[str] = [] if not await self._async_in_zone(): if not self._is_attr_blank(ATTR_DRIVING) and "driving" in ( - self._get_attr(ATTR_DISPLAY_OPTIONS_LIST) + self._get_attr_safe_list(ATTR_DISPLAY_OPTIONS_LIST) ): - formatted_place_array.append(self._get_attr(ATTR_DRIVING)) + formatted_place_array.append(self._get_attr_safe_str(ATTR_DRIVING)) # Don't use place name if the same as another attributes use_place_name: bool = True sensor_attributes_values: list[str] = [] sensor_attributes_values.extend( [ - self._get_attr(attr) + self._get_attr_safe_str(attr) for attr in PLACE_NAME_DUPLICATE_LIST if not self._is_attr_blank(attr) ] @@ -1316,11 +1344,11 @@ async def _async_build_formatted_place(self) -> None: if not use_place_name: if ( not self._is_attr_blank(ATTR_PLACE_TYPE) - and (self._get_attr(ATTR_PLACE_TYPE)).lower() != "unclassified" - and (self._get_attr(ATTR_PLACE_CATEGORY)).lower() != "highway" + and self._get_attr_safe_str(ATTR_PLACE_TYPE).lower() != "unclassified" + and self._get_attr_safe_str(ATTR_PLACE_CATEGORY).lower() != "highway" ): formatted_place_array.append( - (self._get_attr(ATTR_PLACE_TYPE)) + self._get_attr_safe_str(ATTR_PLACE_TYPE) .title() .replace("Proposed", "") .replace("Construction", "") @@ -1328,110 +1356,282 @@ async def _async_build_formatted_place(self) -> None: ) elif ( not self._is_attr_blank(ATTR_PLACE_CATEGORY) - and (self._get_attr(ATTR_PLACE_CATEGORY)).lower() != "highway" + and self._get_attr_safe_str(ATTR_PLACE_CATEGORY).lower() != "highway" ): formatted_place_array.append( - (self._get_attr(ATTR_PLACE_CATEGORY)).title().strip() + self._get_attr_safe_str(ATTR_PLACE_CATEGORY).title().strip() ) street: str | None = None if self._is_attr_blank(ATTR_STREET) and not self._is_attr_blank(ATTR_STREET_REF): - street = (self._get_attr(ATTR_STREET_REF)).strip() + street = self._get_attr_safe_str(ATTR_STREET_REF).strip() _LOGGER.debug("(%s) Using street_ref: %s", self._get_attr(CONF_NAME), street) elif not self._is_attr_blank(ATTR_STREET): if ( not self._is_attr_blank(ATTR_PLACE_CATEGORY) - and (self._get_attr(ATTR_PLACE_CATEGORY)).lower() == "highway" + and self._get_attr_safe_str(ATTR_PLACE_CATEGORY).lower() == "highway" and not self._is_attr_blank(ATTR_PLACE_TYPE) - and (self._get_attr(ATTR_PLACE_TYPE)).lower() in {"motorway", "trunk"} + and self._get_attr_safe_str(ATTR_PLACE_TYPE).lower() + in {"motorway", "trunk"} and not self._is_attr_blank(ATTR_STREET_REF) ): - street = (self._get_attr(ATTR_STREET_REF)).strip() + street = self._get_attr_safe_str(ATTR_STREET_REF).strip() _LOGGER.debug( "(%s) Using street_ref: %s", self._get_attr(CONF_NAME), street ) else: - street = (self._get_attr(ATTR_STREET)).strip() + street = self._get_attr_safe_str(ATTR_STREET).strip() _LOGGER.debug("(%s) Using street: %s", self._get_attr(CONF_NAME), street) if street and self._is_attr_blank(ATTR_STREET_NUMBER): formatted_place_array.append(street) elif street and not self._is_attr_blank(ATTR_STREET_NUMBER): formatted_place_array.append( - f"{str(self._get_attr(ATTR_STREET_NUMBER)).strip()} {street}" + f"{self._get_attr_safe_str(ATTR_STREET_NUMBER).strip()} {street}" ) if ( not self._is_attr_blank(ATTR_PLACE_TYPE) - and (self._get_attr(ATTR_PLACE_TYPE)).lower() == "house" + and self._get_attr_safe_str(ATTR_PLACE_TYPE).lower() == "house" and not self._is_attr_blank(ATTR_PLACE_NEIGHBOURHOOD) ): - formatted_place_array.append((self._get_attr(ATTR_PLACE_NEIGHBOURHOOD)).strip()) + formatted_place_array.append( + self._get_attr_safe_str(ATTR_PLACE_NEIGHBOURHOOD).strip() + ) else: - formatted_place_array.append((self._get_attr(ATTR_PLACE_NAME)).strip()) + formatted_place_array.append(self._get_attr_safe_str(ATTR_PLACE_NAME).strip()) if not self._is_attr_blank(ATTR_CITY_CLEAN): - formatted_place_array.append((self._get_attr(ATTR_CITY_CLEAN)).strip()) + formatted_place_array.append(self._get_attr_safe_str(ATTR_CITY_CLEAN).strip()) elif not self._is_attr_blank(ATTR_CITY): - formatted_place_array.append((self._get_attr(ATTR_CITY)).strip()) + formatted_place_array.append(self._get_attr_safe_str(ATTR_CITY).strip()) elif not self._is_attr_blank(ATTR_COUNTY): - formatted_place_array.append((self._get_attr(ATTR_COUNTY)).strip()) + formatted_place_array.append(self._get_attr_safe_str(ATTR_COUNTY).strip()) if not self._is_attr_blank(ATTR_STATE_ABBR): - formatted_place_array.append(self._get_attr(ATTR_STATE_ABBR)) + formatted_place_array.append(self._get_attr_safe_str(ATTR_STATE_ABBR)) else: - formatted_place_array.append((self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME)).strip()) + formatted_place_array.append( + self._get_attr_safe_str(ATTR_DEVICETRACKER_ZONE_NAME).strip() + ) formatted_place: str = ", ".join(item for item in formatted_place_array) formatted_place = formatted_place.replace("\n", " ").replace(" ", " ").strip() self._set_attr(ATTR_FORMATTED_PLACE, formatted_place) - async def _async_build_from_advanced_options(self, curr_options: str) -> None: - def has_matching_delimiters(option: str) -> bool: - return option.count("[") == option.count("]") and option.count("(") == option.count(")") - - async def process_option( - opt: str, incl=None, excl=None, incl_attr=None, excl_attr=None - ) -> None: - if not opt: - return - ret_state = await self._async_get_option_state( - opt.strip(), incl, excl, incl_attr, excl_attr + async def _do_bracket_parens_count_match(self, curr_options: str) -> bool: + if curr_options.count("[") != curr_options.count("]"): + _LOGGER.error( + "(%s) [adv_options] Bracket Count Mismatch: %s", + self._get_attr(CONF_NAME), + curr_options, ) - if ret_state: - self._adv_options_state_list.append(ret_state) - _LOGGER.debug( - "(%s) [adv_options] Updated state list: %s", - self._get_attr(CONF_NAME), - self._adv_options_state_list, - ) - - if not curr_options or not has_matching_delimiters(curr_options): + return False + if curr_options.count("(") != curr_options.count(")"): _LOGGER.error( - "(%s) [adv_options] Invalid syntax in options: %s", + "(%s) [adv_options] Parenthesis Count Mismatch: %s", self._get_attr(CONF_NAME), curr_options, ) + return False + return True + + async def _async_build_from_advanced_options(self, curr_options: str): + _LOGGER.debug("(%s) [adv_options] Options: %s", self._get_attr(CONF_NAME), curr_options) + if not await self._do_bracket_parens_count_match(curr_options) or not curr_options: return + incl: list[str] = [] + excl: list[str] = [] + incl_attr: MutableMapping[str, Any] = {} + excl_attr: MutableMapping[str, Any] = {} + none_opt: str | None = None + next_opt: str | None = None + if "[" in curr_options or "(" in curr_options: + # _LOGGER.debug("(%s) [adv_options] Options has a [ or ( and optional ,", self._get_attr(CONF_NAME)) + comma_num: int = curr_options.find(",") + bracket_num: int = curr_options.find("[") + paren_num: int = curr_options.find("(") + if ( + comma_num != -1 + and (bracket_num == -1 or comma_num < bracket_num) + and (paren_num == -1 or comma_num < paren_num) + ): + # Comma is first symbol + # _LOGGER.debug("(%s) [adv_options] Comma is First", self._get_attr(CONF_NAME)) + opt: str = curr_options[:comma_num] + # _LOGGER.debug("(%s) [adv_options] Option: %s", self._get_attr(CONF_NAME), opt) + if opt is not None and opt: + ret_state = await self._async_get_option_state(opt.strip()) + if ret_state is not None and ret_state: + self._adv_options_state_list.append(ret_state) + _LOGGER.debug( + "(%s) [adv_options] Updated state list: %s", + self._get_attr(CONF_NAME), + self._adv_options_state_list, + ) + next_opt = curr_options[(comma_num + 1) :] + # _LOGGER.debug("(%s) [adv_options] Next Options: %s",self._get_attr(CONF_NAME), next_opt) + if next_opt is not None and next_opt: + await self._async_build_from_advanced_options(next_opt.strip()) + # _LOGGER.debug("(%s) [adv_options] Back from recursion", self._get_attr(CONF_NAME)) + return + if ( + bracket_num != -1 + and (comma_num == -1 or bracket_num < comma_num) + and (paren_num == -1 or bracket_num < paren_num) + ): + # Bracket is first symbol + # _LOGGER.debug("(%s) [adv_options] Bracket is First", self._get_attr(CONF_NAME)) + opt = curr_options[:bracket_num] + # _LOGGER.debug("(%s) [adv_options] Option: %s", self._get_attr(CONF_NAME), opt) + none_opt, next_opt = await self._async_parse_bracket(curr_options[bracket_num:]) + if next_opt is not None and next_opt and len(next_opt) > 1 and next_opt[0] == "(": + # Parse Parenthesis + incl, excl, incl_attr, excl_attr, next_opt = await self._async_parse_parens( + next_opt + ) - while curr_options: - first_char = curr_options[0] - if first_char in "[(": - parse_func = ( - self._async_parse_bracket if first_char == "[" else self._async_parse_parens + if opt is not None and opt: + ret_state = await self._async_get_option_state( + opt.strip(), incl, excl, incl_attr, excl_attr + ) + if ret_state is not None and ret_state: + self._adv_options_state_list.append(ret_state) + _LOGGER.debug( + "(%s) [adv_options] Updated state list: %s", + self._get_attr(CONF_NAME), + self._adv_options_state_list, + ) + elif none_opt is not None and none_opt: + await self._async_build_from_advanced_options(none_opt.strip()) + # _LOGGER.debug("(%s) [adv_options] Back from recursion", self._get_attr(CONF_NAME)) + + if next_opt is not None and next_opt and len(next_opt) > 1 and next_opt[0] == ",": + next_opt = next_opt[1:] + # _LOGGER.debug("(%s) [adv_options] Next Options: %s", self._get_attr(CONF_NAME), next_opt) + if next_opt is not None and next_opt: + await self._async_build_from_advanced_options(next_opt.strip()) + # _LOGGER.debug("(%s) [adv_options] Back from recursion", self._get_attr(CONF_NAME)) + return + if ( + paren_num != -1 + and (comma_num == -1 or paren_num < comma_num) + and (bracket_num == -1 or paren_num < bracket_num) + ): + # Parenthesis is first symbol + # _LOGGER.debug("(%s) [adv_options] Parenthesis is First", self._get_attr(CONF_NAME)) + opt = curr_options[:paren_num] + _LOGGER.debug("(%s) [adv_options] Option: %s", self._get_attr(CONF_NAME), opt) + incl, excl, incl_attr, excl_attr, next_opt = await self._async_parse_parens( + curr_options[paren_num:] ) - incl, excl, incl_attr, excl_attr, next_opt = await parse_func(curr_options) - if next_opt and next_opt[0] in "[(": + if next_opt is not None and next_opt and len(next_opt) > 1 and next_opt[0] == "[": + # Parse Bracket none_opt, next_opt = await self._async_parse_bracket(next_opt) - if none_opt: + + if opt is not None and opt: + ret_state = await self._async_get_option_state( + opt.strip(), incl, excl, incl_attr, excl_attr + ) + if ret_state is not None and ret_state: + self._adv_options_state_list.append(ret_state) + _LOGGER.debug( + "(%s) [adv_options] Updated state list: %s", + self._get_attr(CONF_NAME), + self._adv_options_state_list, + ) + elif none_opt is not None and none_opt: await self._async_build_from_advanced_options(none_opt.strip()) - await process_option( - curr_options[: curr_options.find(first_char)], incl, excl, incl_attr, excl_attr - ) - curr_options = next_opt.lstrip(",") if next_opt else None - elif "," in curr_options: - options = curr_options.split(",") - for opt in options: - await process_option(opt) - break - else: - await process_option(curr_options) - break + # _LOGGER.debug("(%s) [adv_options] Back from recursion", self._get_attr(CONF_NAME)) + + if next_opt is not None and next_opt and len(next_opt) > 1 and next_opt[0] == ",": + next_opt = next_opt[1:] + # _LOGGER.debug("(%s) [adv_options] Next Options: %s", self._get_attr(CONF_NAME), next_opt) + if next_opt is not None and next_opt: + await self._async_build_from_advanced_options(next_opt.strip()) + # _LOGGER.debug("(%s) [adv_options] Back from recursion", self._get_attr(CONF_NAME)) + return + return + # _LOGGER.debug("(%s) [adv_options] Options has , but no [ or (, splitting", self._get_attr(CONF_NAME)) + if "," in curr_options: + await self._process_advanced_only_commas(curr_options=curr_options) + return + + # _LOGGER.debug("(%s) [adv_options] Options should just be a single term", self._get_attr(CONF_NAME)) + await self._process_advanced_single_term(curr_options=curr_options) + return + + async def _process_advanced_only_commas(self, curr_options: str) -> None: + # _LOGGER.debug("(%s) [adv_options] Options has , but no [ or (, splitting", self._get_attr(CONF_NAME)) + for opt in curr_options.split(","): + if opt is not None and opt: + ret_state = await self._async_get_option_state(opt.strip()) + if ret_state is not None and ret_state: + self._adv_options_state_list.append(ret_state) + _LOGGER.debug( + "(%s) [adv_options] Updated state list: %s", + self._get_attr(CONF_NAME), + self._adv_options_state_list, + ) + + async def _process_advanced_single_term(self, curr_options: str) -> None: + ret_state = await self._async_get_option_state(curr_options.strip()) + if ret_state is not None and ret_state: + self._adv_options_state_list.append(ret_state) + _LOGGER.debug( + "(%s) [adv_options] Updated state list: %s", + self._get_attr(CONF_NAME), + self._adv_options_state_list, + ) + + # async def _async_build_from_advanced_options(self, curr_options: str | None) -> None: + # def has_matching_delimiters(option: str) -> bool: + # return option.count("[") == option.count("]") and option.count("(") == option.count(")") + + # async def process_option( + # opt: str, incl=None, excl=None, incl_attr=None, excl_attr=None + # ) -> None: + # if not opt: + # return + # ret_state = await self._async_get_option_state( + # opt.strip(), incl, excl, incl_attr, excl_attr + # ) + # if ret_state: + # self._adv_options_state_list.append(ret_state) + # _LOGGER.debug( + # "(%s) [adv_options] Updated state list: %s", + # self._get_attr(CONF_NAME), + # self._adv_options_state_list, + # ) + + # if not curr_options or not has_matching_delimiters(curr_options): + # _LOGGER.error( + # "(%s) [adv_options] Invalid syntax in options: %s", + # self._get_attr(CONF_NAME), + # curr_options, + # ) + # return + + # while curr_options: + # first_char: str = curr_options[0] + # if first_char in "[(": + # parse_func = ( + # self._async_parse_bracket if first_char == "[" else self._async_parse_parens + # ) + # incl, excl, incl_attr, excl_attr, next_opt = await parse_func( + # curr_options=curr_options + # ) + # if next_opt and next_opt[0] in "[(": + # none_opt, next_opt = await self._async_parse_bracket(next_opt) + # if none_opt: + # await self._async_build_from_advanced_options(curr_options=none_opt.strip()) + # await process_option( + # curr_options[: curr_options.find(first_char)], incl, excl, incl_attr, excl_attr + # ) + # curr_options = next_opt.lstrip(",") if next_opt else None + # elif "," in curr_options: + # options: list[str] = curr_options.split(",") + # for opt in options: + # await process_option(opt) + # break + # else: + # await process_option(curr_options) + # break async def _async_parse_parens(self, curr_options: str): incl: list = [] @@ -1573,7 +1773,7 @@ async def _async_get_option_state( excl_attr = {} if excl_attr is None else excl_attr if opt: opt = str(opt).lower().strip() - # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] Option: {opt}") + _LOGGER.debug("(%s) [get_option_state] Option: %s", self._get_attr(CONF_NAME), opt) out: str | None = self._get_attr(DISPLAY_OPTIONS_MAP.get(opt)) if ( DISPLAY_OPTIONS_MAP.get(opt) in {ATTR_DEVICETRACKER_ZONE, ATTR_DEVICETRACKER_ZONE_NAME} @@ -1581,10 +1781,14 @@ async def _async_get_option_state( ): out = None _LOGGER.debug("(%s) [get_option_state] State: %s", self._get_attr(CONF_NAME), out) - # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] incl list: {incl}") - # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] excl list: {excl}") - # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] incl_attr dict: {incl_attr}") - # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] excl_attr dict: {excl_attr}") + _LOGGER.debug("(%s) [get_option_state] incl list: %s", self._get_attr(CONF_NAME), incl) + _LOGGER.debug("(%s) [get_option_state] excl list: %s", self._get_attr(CONF_NAME), excl) + _LOGGER.debug( + "(%s) [get_option_state] incl_attr dict: %s", self._get_attr(CONF_NAME), incl_attr + ) + _LOGGER.debug( + "(%s) [get_option_state] excl_attr dict: %s", self._get_attr(CONF_NAME), excl_attr + ) if out: if ( incl @@ -1595,8 +1799,15 @@ async def _async_get_option_state( out = None if incl_attr: for attr, states in incl_attr.items(): - # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] incl_attr: {attr} / State: {self._get_attr(DISPLAY_OPTIONS_MAP.get(attr))}") - # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] incl_states: {states}") + _LOGGER.debug( + "(%s) [get_option_state] incl_attr: %s / State: %s", + self._get_attr(CONF_NAME), + attr, + self._get_attr(DISPLAY_OPTIONS_MAP.get(attr)), + ) + _LOGGER.debug( + "(%s) [get_option_state] incl_states: %s", self._get_attr(CONF_NAME), states + ) map_attr: str | None = DISPLAY_OPTIONS_MAP.get(attr) if ( not map_attr @@ -1606,8 +1817,15 @@ async def _async_get_option_state( out = None if excl_attr: for attr, states in excl_attr.items(): - # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] excl_attr: {attr} / State: {self._get_attr(DISPLAY_OPTIONS_MAP.get(attr))}") - # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] excl_states: {states}") + _LOGGER.debug( + "(%s) [get_option_state] excl_attr: %s / State: %s", + self._get_attr(CONF_NAME), + attr, + self._get_attr(DISPLAY_OPTIONS_MAP.get(attr)), + ) + _LOGGER.debug( + "(%s) [get_option_state] excl_states: %s", self._get_attr(CONF_NAME), states + ) if self._get_attr(DISPLAY_OPTIONS_MAP.get(attr)) in states: out = None _LOGGER.debug( @@ -1666,7 +1884,7 @@ async def _async_compile_state_from_advanced_options(self) -> None: ) async def _async_build_state_from_display_options(self) -> None: - display_options = self._get_attr(ATTR_DISPLAY_OPTIONS_LIST) + display_options = self._get_attr_safe_list(ATTR_DISPLAY_OPTIONS_LIST) _LOGGER.debug( "(%s) Building State from Display Options: %s", self._get_attr(CONF_NAME), @@ -1676,7 +1894,7 @@ async def _async_build_state_from_display_options(self) -> None: def add_to_display(option_key: str, attr_key: str, condition: bool = True) -> None: """Add attribute value to user_display if the conditions are met.""" if option_key in display_options and not self._is_attr_blank(attr_key) and condition: - user_display.append(self._get_attr(attr_key)) + user_display.append(self._get_attr_safe_str(attr_key)) user_display: list[str] = [] @@ -1699,7 +1917,7 @@ def add_to_display(option_key: str, attr_key: str, condition: bool = True) -> No if not self._is_attr_blank(ATTR_PLACE_NAME) and self._get_attr( ATTR_PLACE_NAME ) != self._get_attr(ATTR_STREET): - user_display.append(self._get_attr(ATTR_PLACE_NAME)) + user_display.append(self._get_attr_safe_str(ATTR_PLACE_NAME)) for key, attr in { "place_category": ATTR_PLACE_CATEGORY, "place_type": ATTR_PLACE_TYPE, @@ -1739,7 +1957,7 @@ def add_to_display(option_key: str, attr_key: str, condition: bool = True) -> No else option ) if not self._is_attr_blank(attr_key): - user_display.append(self._get_attr(attr_key)) + user_display.append(self._get_attr_safe_str(attr_key)) # Set the final state if user_display: @@ -1752,11 +1970,11 @@ def add_to_display(option_key: str, attr_key: str, condition: bool = True) -> No async def _async_get_extended_attr(self) -> None: if not self._is_attr_blank(ATTR_OSM_ID) and not self._is_attr_blank(ATTR_OSM_TYPE): - if (self._get_attr(ATTR_OSM_TYPE)).lower() == "node": + if self._get_attr_safe_str(ATTR_OSM_TYPE).lower() == "node": osm_type_abbr = "N" - elif (self._get_attr(ATTR_OSM_TYPE)).lower() == "way": + elif self._get_attr_safe_str(ATTR_OSM_TYPE).lower() == "way": osm_type_abbr = "W" - elif (self._get_attr(ATTR_OSM_TYPE)).lower() == "relation": + elif self._get_attr_safe_str(ATTR_OSM_TYPE).lower() == "relation": osm_type_abbr = "R" osm_details_url: str = ( @@ -1782,15 +2000,20 @@ async def _async_get_extended_attr(self) -> None: if ( not self._is_attr_blank(ATTR_OSM_DETAILS_DICT) - and "extratags" in (self._get_attr(ATTR_OSM_DETAILS_DICT)) - and (self._get_attr(ATTR_OSM_DETAILS_DICT)).get("extratags") is not None - and "wikidata" in (self._get_attr(ATTR_OSM_DETAILS_DICT)).get("extratags") - and (self._get_attr(ATTR_OSM_DETAILS_DICT)).get("extratags").get("wikidata") + and "extratags" in self._get_attr_safe_dict(ATTR_OSM_DETAILS_DICT) + and self._get_attr_safe_dict(ATTR_OSM_DETAILS_DICT).get("extratags") is not None + and "wikidata" + in self._get_attr_safe_dict(ATTR_OSM_DETAILS_DICT).get("extratags", {}) + and self._get_attr_safe_dict(ATTR_OSM_DETAILS_DICT) + .get("extratags", {}) + .get("wikidata") is not None ): self._set_attr( ATTR_WIKIDATA_ID, - (self._get_attr(ATTR_OSM_DETAILS_DICT)).get("extratags").get("wikidata"), + self._get_attr_safe_dict(ATTR_OSM_DETAILS_DICT) + .get("extratags", {}) + .get("wikidata"), ) self._set_attr(ATTR_WIKIDATA_DICT, {}) @@ -1906,7 +2129,7 @@ async def _async_get_initial_last_place_name(self) -> None: ) async def _async_update_coordinates_and_distance(self) -> int: - last_distance_traveled_m = self._get_attr(ATTR_DISTANCE_FROM_HOME_M) + last_distance_traveled_m: float = self._get_attr_safe_float(ATTR_DISTANCE_FROM_HOME_M) proceed_with_update = 1 # 0: False. 1: True. 2: False, but set direction of travel to stationary @@ -1939,20 +2162,20 @@ async def _async_update_coordinates_and_distance(self) -> int: self._set_attr( ATTR_DISTANCE_FROM_HOME_M, distance( - float(self._get_attr(ATTR_LATITUDE)), - float(self._get_attr(ATTR_LONGITUDE)), - float(self._get_attr(ATTR_HOME_LATITUDE)), - float(self._get_attr(ATTR_HOME_LONGITUDE)), + float(self._get_attr_safe_str(ATTR_LATITUDE)), + float(self._get_attr_safe_str(ATTR_LONGITUDE)), + float(self._get_attr_safe_str(ATTR_HOME_LATITUDE)), + float(self._get_attr_safe_str(ATTR_HOME_LONGITUDE)), ), ) if not self._is_attr_blank(ATTR_DISTANCE_FROM_HOME_M): self._set_attr( ATTR_DISTANCE_FROM_HOME_KM, - round(self._get_attr(ATTR_DISTANCE_FROM_HOME_M) / 1000, 3), + round(self._get_attr_safe_float(ATTR_DISTANCE_FROM_HOME_M) / 1000, 3), ) self._set_attr( ATTR_DISTANCE_FROM_HOME_MI, - round(self._get_attr(ATTR_DISTANCE_FROM_HOME_M) / 1609, 3), + round(self._get_attr_safe_float(ATTR_DISTANCE_FROM_HOME_M) / 1609, 3), ) if not self._is_attr_blank(ATTR_LATITUDE_OLD) and not self._is_attr_blank( @@ -1961,24 +2184,26 @@ async def _async_update_coordinates_and_distance(self) -> int: self._set_attr( ATTR_DISTANCE_TRAVELED_M, distance( - float(self._get_attr(ATTR_LATITUDE)), - float(self._get_attr(ATTR_LONGITUDE)), - float(self._get_attr(ATTR_LATITUDE_OLD)), - float(self._get_attr(ATTR_LONGITUDE_OLD)), + float(self._get_attr_safe_str(ATTR_LATITUDE)), + float(self._get_attr_safe_str(ATTR_LONGITUDE)), + float(self._get_attr_safe_str(ATTR_LATITUDE_OLD)), + float(self._get_attr_safe_str(ATTR_LONGITUDE_OLD)), ), ) if not self._is_attr_blank(ATTR_DISTANCE_TRAVELED_M): self._set_attr( ATTR_DISTANCE_TRAVELED_MI, round( - self._get_attr(ATTR_DISTANCE_TRAVELED_M) / 1609, + self._get_attr_safe_float(ATTR_DISTANCE_TRAVELED_M) / 1609, 3, ), ) - if last_distance_traveled_m > self._get_attr(ATTR_DISTANCE_FROM_HOME_M): + if last_distance_traveled_m > self._get_attr_safe_float(ATTR_DISTANCE_FROM_HOME_M): self._set_attr(ATTR_DIRECTION_OF_TRAVEL, "towards home") - elif last_distance_traveled_m < self._get_attr(ATTR_DISTANCE_FROM_HOME_M): + elif last_distance_traveled_m < self._get_attr_safe_float( + ATTR_DISTANCE_FROM_HOME_M + ): self._set_attr(ATTR_DIRECTION_OF_TRAVEL, "away from home") else: self._set_attr(ATTR_DIRECTION_OF_TRAVEL, "stationary") @@ -2005,7 +2230,7 @@ async def _async_update_coordinates_and_distance(self) -> int: _LOGGER.info( "(%s) Distance from home [%s]: %s km", self._get_attr(CONF_NAME), - (self._get_attr(CONF_HOME_ZONE)).split(".")[1], + self._get_attr_safe_str(CONF_HOME_ZONE).split(".")[1], self._get_attr(ATTR_DISTANCE_FROM_HOME_KM), ) _LOGGER.info( @@ -2016,7 +2241,7 @@ async def _async_update_coordinates_and_distance(self) -> int: _LOGGER.info( "(%s) Meters traveled since last update: %s", self._get_attr(CONF_NAME), - round(self._get_attr(ATTR_DISTANCE_TRAVELED_M), 1), + round(self._get_attr_safe_float(ATTR_DISTANCE_TRAVELED_M), 1), ) else: proceed_with_update = 0 @@ -2078,7 +2303,7 @@ async def _async_do_update(self, reason: str) -> None: await self._update_entity_name_and_cleanup() await self._update_previous_state() await self._update_old_coordinates() - prev_last_place_name = self._get_attr(ATTR_LAST_PLACE_NAME) + prev_last_place_name = self._get_attr_safe_str(ATTR_LAST_PLACE_NAME) # 0: False. 1: True. 2: False, but set direction of travel to stationary proceed_with_update: int = await self._check_device_tracker_and_update_coords() @@ -2100,13 +2325,13 @@ async def _async_do_update(self, reason: str) -> None: else: await self._rollback_update(previous_attr, now, proceed_with_update) - self._set_attr(ATTR_LAST_UPDATED, str(now.isoformat(sep=" ", timespec="seconds"))) + self._set_attr(ATTR_LAST_UPDATED, now.isoformat(sep=" ", timespec="seconds")) _LOGGER.info("(%s) End of Update", self._get_attr(CONF_NAME)) async def _should_update_state(self, now: datetime) -> bool: - prev_state = self._get_attr(ATTR_PREVIOUS_STATE) - native_value = self._get_attr(ATTR_NATIVE_VALUE) - tracker_zone = self._get_attr(ATTR_DEVICETRACKER_ZONE) + prev_state: str = self._get_attr_safe_str(ATTR_PREVIOUS_STATE) + native_value: str = self._get_attr_safe_str(ATTR_NATIVE_VALUE) + tracker_zone: str = self._get_attr_safe_str(ATTR_DEVICETRACKER_ZONE) if ( ( @@ -2133,11 +2358,11 @@ async def _handle_state_update(self, now: datetime, prev_last_place_name: str) - current_time: str = f"{now.hour:02}:{now.minute:02}" if self._get_attr(CONF_SHOW_TIME): state: str = await Places._async_clear_since_from_state( - str(self._get_attr(ATTR_NATIVE_VALUE)) + self._get_attr_safe_str(ATTR_NATIVE_VALUE) ) self._set_attr(ATTR_NATIVE_VALUE, f"{state[: 255 - 14]} (since {current_time})") else: - self._set_attr(ATTR_NATIVE_VALUE, self._get_attr(ATTR_NATIVE_VALUE)[:255]) + self._set_attr(ATTR_NATIVE_VALUE, self._get_attr_safe_str(ATTR_NATIVE_VALUE)[:255]) _LOGGER.info( "(%s) New State: %s", self._get_attr(CONF_NAME), @@ -2195,7 +2420,7 @@ async def _update_previous_state(self) -> None: self._set_attr( ATTR_PREVIOUS_STATE, await Places._async_clear_since_from_state( - orig_state=self._get_attr(ATTR_NATIVE_VALUE) + orig_state=self._get_attr_safe_str(ATTR_NATIVE_VALUE) ), ) else: @@ -2271,14 +2496,16 @@ async def _query_osm_and_finalize(self, now: datetime) -> None: ) if not self._is_attr_blank(ATTR_OSM_DICT): await self._async_parse_osm_dict() - await self._async_finalize_last_place_name(self._get_attr(ATTR_LAST_PLACE_NAME)) + await self._async_finalize_last_place_name( + self._get_attr_safe_str(ATTR_LAST_PLACE_NAME) + ) await self._process_display_options() - self._set_attr(ATTR_LAST_CHANGED, str(now.isoformat(sep=" ", timespec="seconds"))) + self._set_attr(ATTR_LAST_CHANGED, now.isoformat(sep=" ", timespec="seconds")) async def _process_display_options(self) -> None: display_options: list[str] = [] if not self._is_attr_blank(ATTR_DISPLAY_OPTIONS): - options_array = (self._get_attr(ATTR_DISPLAY_OPTIONS)).split(",") + options_array = self._get_attr_safe_str(ATTR_DISPLAY_OPTIONS).split(",") for option in options_array: display_options.extend([option.strip()]) self._set_attr(ATTR_DISPLAY_OPTIONS_LIST, display_options) @@ -2297,7 +2524,9 @@ async def _process_display_options(self) -> None: self._get_attr(ATTR_NATIVE_VALUE), ) - elif any(ext in (self._get_attr(ATTR_DISPLAY_OPTIONS)) for ext in ["(", ")", "[", "]"]): + elif any( + ext in (self._get_attr_safe_str(ATTR_DISPLAY_OPTIONS)) for ext in ["(", ")", "[", "]"] + ): self._clear_attr(ATTR_DISPLAY_OPTIONS_LIST) display_options = [] self._adv_options_state_list = [] @@ -2310,11 +2539,14 @@ async def _process_display_options(self) -> None: self._get_attr(ATTR_DISPLAY_OPTIONS), ) - await self._async_build_from_advanced_options(self._get_attr(ATTR_DISPLAY_OPTIONS)) - # _LOGGER.debug( - # f"({self._get_attr(CONF_NAME)}) Back from initial advanced build: " - # + f"{self._adv_options_state_list}" - # ) + await self._async_build_from_advanced_options( + self._get_attr_safe_str(ATTR_DISPLAY_OPTIONS) + ) + _LOGGER.debug( + "(%s) Back from initial advanced build: %s", + self._get_attr(CONF_NAME), + self._adv_options_state_list, + ) await self._async_compile_state_from_advanced_options() elif not await self._async_in_zone(): await self._async_build_state_from_display_options() @@ -2344,15 +2576,15 @@ async def _process_display_options(self) -> None: async def _build_osm_url(self) -> str: """Build the OpenStreetMap query URL.""" base_url = "https://nominatim.openstreetmap.org/reverse?format=json" - lat = self._get_attr(ATTR_LATITUDE) - lon = self._get_attr(ATTR_LONGITUDE) - lang: str = self._get_attr(CONF_LANGUAGE) if not self._is_attr_blank(CONF_LANGUAGE) else "" - email: str = self._get_attr(CONF_API_KEY) if not self._is_attr_blank(CONF_API_KEY) else "" + lat: str = self._get_attr_safe_str(ATTR_LATITUDE) + lon: str = self._get_attr_safe_str(ATTR_LONGITUDE) + lang: str = self._get_attr_safe_str(CONF_LANGUAGE) + email: str = self._get_attr_safe_str(CONF_API_KEY) return f"{base_url}&lat={lat}&lon={lon}&accept-language={lang}&addressdetails=1&namedetails=1&zoom=18&limit=1&email={email}" async def _async_change_dot_to_stationary(self, now: datetime, changed_diff_sec: int) -> None: self._set_attr(ATTR_DIRECTION_OF_TRAVEL, "stationary") - self._set_attr(ATTR_LAST_CHANGED, str(now.isoformat(sep=" ", timespec="seconds"))) + self._set_attr(ATTR_LAST_CHANGED, now.isoformat(sep=" ", timespec="seconds")) await self._hass.async_add_executor_job( self._write_sensor_to_json, self._get_attr(CONF_NAME), @@ -2371,13 +2603,13 @@ async def _async_change_show_time_to_date(self) -> None: else: dateformat = "%m/%d" mmddstring: str = ( - datetime.fromisoformat(self._get_attr(ATTR_LAST_CHANGED)) + datetime.fromisoformat(self._get_attr_safe_str(ATTR_LAST_CHANGED)) .strftime(f"{dateformat}") .replace(" ", "")[:5] ) self._set_attr( ATTR_NATIVE_VALUE, - f"{await Places._async_clear_since_from_state(str(self._get_attr(ATTR_NATIVE_VALUE)))} (since {mmddstring})", + f"{await Places._async_clear_since_from_state(self._get_attr_safe_str(ATTR_NATIVE_VALUE))} (since {mmddstring})", ) if not self._is_attr_blank(ATTR_NATIVE_VALUE): @@ -2402,7 +2634,9 @@ async def _async_get_seconds_from_last_change(self, now: datetime) -> int: if self._is_attr_blank(ATTR_LAST_CHANGED): return 3600 try: - last_changed: datetime = datetime.fromisoformat(self._get_attr(ATTR_LAST_CHANGED)) + last_changed: datetime = datetime.fromisoformat( + self._get_attr_safe_str(ATTR_LAST_CHANGED) + ) except (TypeError, ValueError) as e: _LOGGER.warning( "Error converting Last Changed date/time (%s) into datetime: %r",