From 14d164845d1f5874d5307b615c67a351d1a1a98c Mon Sep 17 00:00:00 2001 From: Andrew Cockburn Date: Tue, 28 Jan 2025 15:26:31 +0000 Subject: [PATCH] Fix for now_is_between --- appdaemon/scheduler.py | 170 +++++++++++++++++++++++++++-------------- 1 file changed, 113 insertions(+), 57 deletions(-) diff --git a/appdaemon/scheduler.py b/appdaemon/scheduler.py index 82d47ce66..b50d5bb9b 100644 --- a/appdaemon/scheduler.py +++ b/appdaemon/scheduler.py @@ -6,7 +6,7 @@ import traceback import uuid from collections import OrderedDict -from datetime import datetime, time, timedelta, timezone, MAXYEAR +from datetime import MAXYEAR, datetime, time, timedelta, timezone from itertools import count from logging import Logger from typing import TYPE_CHECKING, Any, Callable @@ -23,14 +23,16 @@ time_regex_str = r"(?P\d+):(?P\d+):(?P\d+)(?:\.(?P\d+))?" -date_regex_str = r"^(?P\d{4})-(?P\d{2})-(?P\d{2})" + r"(?:\s+" + f'{time_regex_str})?' +date_regex_str = r"^(?P\d{4})-(?P\d{2})-(?P\d{2})" + \ + r"(?:\s+" + f'{time_regex_str})?' DATE_REGEX = re.compile(date_regex_str) TIME_REGEX = re.compile(f"^{time_regex_str}") SUN_REGEX = re.compile( r"^(?Psunrise|sunset)(?:\s+[+-]\s+(?P\d+):(?P\d+):(?P\d+)(?:\.(?P\d+))?)?", re.IGNORECASE, ) -ELEVATION_REGEX = re.compile(r"^(?P\d+(?:\.\d+)?)\s+deg\s+(?Prising|setting)$", re.IGNORECASE) +ELEVATION_REGEX = re.compile( + r"^(?P\d+(?:\.\d+)?)\s+deg\s+(?Prising|setting)$", re.IGNORECASE) class Scheduler: @@ -199,7 +201,8 @@ async def cancel_timer(self, name: str, handle: str, silent: bool) -> bool: del self.schedule[name] if not executed and not silent: - self.logger.warning(f"Invalid callback handle '{handle}' in cancel_timer() from app {name}") + self.logger.warning(f"Invalid callback handle '{ + handle}' in cancel_timer() from app {name}") return executed @@ -216,12 +219,14 @@ async def restart_timer(self, uuid_: str, args: dict, restart_offset: int = 0) - # the timestamp with the repeat interval if restart_offset > 0: # we to restart with an offset - new_timestamp = args["timestamp"] + timedelta(seconds=restart_offset) + new_timestamp = args["timestamp"] + \ + timedelta(seconds=restart_offset) args["timestamp"] = new_timestamp else: args["basetime"] += timedelta(seconds=args["interval"]) - args["timestamp"] = args["basetime"] + timedelta(seconds=self.get_offset(**args['kwargs'])) + args["timestamp"] = args["basetime"] + \ + timedelta(seconds=self.get_offset(**args['kwargs'])) # Update entity @@ -229,7 +234,8 @@ async def restart_timer(self, uuid_: str, args: dict, restart_offset: int = 0) - "_scheduler", "admin", f"scheduler_callback.{uuid_}", - execution_time=utils.dt_to_str(args["timestamp"].replace(microsecond=0), self.AD.tz), + execution_time=utils.dt_to_str( + args["timestamp"].replace(microsecond=0), self.AD.tz), ) return args @@ -246,7 +252,8 @@ async def reset_timer(self, name: str, handle: str) -> bool: if args["type"] == "next_rising" or args["type"] == "next_setting": self.logger.warning( - f"The given handle '{handle}' in reset_timer() from app {name} is a Sun timer, cannot" " reset that" + f"The given handle '{handle}' in reset_timer() from app { + name} is a Sun timer, cannot" " reset that" ) return executed @@ -255,7 +262,8 @@ async def reset_timer(self, name: str, handle: str) -> bool: # we get the time from now to be added basetime_interval = args["basetime_interval"] - restart_offset = basetime_interval - (args["timestamp"] - now).seconds + restart_offset = basetime_interval - \ + (args["timestamp"] - now).seconds args = await self.restart_timer(handle, args, restart_offset) self.schedule[name][handle] = args @@ -270,7 +278,8 @@ async def reset_timer(self, name: str, handle: str) -> bool: if not executed: self.logger.warning( - f"The given handle '{handle}' in reset_timer() from app {name}, doesn't have a running timer" + f"The given handle '{handle}' in reset_timer() from app { + name}, doesn't have a running timer" ) return executed @@ -373,13 +382,15 @@ async def exec_schedule(self, name, args, uuid_): except Exception: error_logger = logging.getLogger("Error.{}".format(name)) error_logger.warning("-" * 60) - error_logger.warning("Unexpected error during exec_schedule() for App: %s", name) + error_logger.warning( + "Unexpected error during exec_schedule() for App: %s", name) error_logger.warning("Args: %s", args) error_logger.warning("-" * 60) error_logger.warning(traceback.format_exc()) error_logger.warning("-" * 60) if self.AD.logging.separate_error_log() is True: - self.logger.warning("Logged an error to %s", self.AD.logging.get_filename("error_log")) + self.logger.warning("Logged an error to %s", + self.AD.logging.get_filename("error_log")) error_logger.warning("Scheduler entry has been deleted") error_logger.warning("-" * 60) await self.AD.state.remove_entity("admin", "scheduler_callback.{}".format(uuid_)) @@ -395,7 +406,8 @@ def init_sun(self): if longitude < -180 or longitude > 180: raise ValueError("Longitude needs to be -180 .. 180") - self.location = Location(LocationInfo("", "", self.AD.tz.zone, latitude, longitude)) + self.location = Location(LocationInfo( + "", "", self.AD.tz.zone, latitude, longitude)) async def sun(self, type: str, secs_offset: int) -> datetime: return (await self.get_next_sun_event(type, secs_offset)) + timedelta(seconds=secs_offset) @@ -470,7 +482,7 @@ async def next_sunset(self, days_offset: int = 0) -> datetime: @staticmethod def get_offset(**kwargs): - kwargs = {k:v for k, v in kwargs.items() if v is not None} + kwargs = {k: v for k, v in kwargs.items() if v is not None} if kwargs.get("offset"): if "random_start" in kwargs or "random_end" in kwargs: raise ValueError( @@ -504,18 +516,21 @@ async def get_next_period( match start: case str() if start.startswith('now'): now_offset = 0 - if "+" in start and (m := re.search(r'\d+', start)): # meaning time to be added + # meaning time to be added + if "+" in start and (m := re.search(r'\d+', start)): now_offset = int(m.group()) aware_start = now + interval + timedelta(seconds=now_offset) case time(): - aware_start = datetime.combine(now.date(), start).astimezone(self.AD.tz) + aware_start = datetime.combine( + now.date(), start).astimezone(self.AD.tz) case datetime(): aware_start = self.AD.sched.convert_naive(start) case None: aware_start = now + interval case _: raise ValueError(f'Bad value for start: {start}') - assert isinstance(aware_start, datetime) and aware_start.tzinfo is not None + assert isinstance( + aware_start, datetime) and aware_start.tzinfo is not None while True: if aware_start >= now: @@ -537,7 +552,8 @@ def is_realtime(self): # def get_next_entries(self): - next_exec = datetime.now(pytz.utc).replace(year=MAXYEAR, month=12, day=31) + next_exec = datetime.now(pytz.utc).replace( + year=MAXYEAR, month=12, day=31) for name in self.schedule.keys(): for entry in self.schedule[name].keys(): if self.schedule[name][entry]["timestamp"] < next_exec: @@ -549,7 +565,8 @@ def get_next_entries(self): for entry in self.schedule[name].keys(): if self.schedule[name][entry]["timestamp"] == next_exec: next_entries.append( - {"name": name, "uuid": entry, "timestamp": self.schedule[name][entry]["timestamp"]} + {"name": name, "uuid": entry, + "timestamp": self.schedule[name][entry]["timestamp"]} ) return next_entries @@ -564,11 +581,13 @@ def get_next_dst_offset(self, base, limit): # TODO : Convert this to some sort of binary search for efficiency # TODO : This really should support sub 1 second periods better - self.logger.debug("get_next_dst_offset() base=%s limit=%s", base, limit) + self.logger.debug( + "get_next_dst_offset() base=%s limit=%s", base, limit) current = base.astimezone(self.AD.tz).dst() self.logger.debug("current=%s", current) for offset in range(1, int(limit) + 1): - candidate = (base + timedelta(seconds=offset)).astimezone(self.AD.tz) + candidate = (base + timedelta(seconds=offset) + ).astimezone(self.AD.tz) # print(candidate) if candidate.dst() != current: return offset @@ -588,7 +607,8 @@ async def loop(self): # noqa: C901 if self.AD.timewarp == 0: self.logger.info("Time displacement factor infinite") else: - self.logger.info("Time displacement factor %s", self.AD.timewarp) + self.logger.info( + "Time displacement factor %s", self.AD.timewarp) else: self.logger.info("Scheduler running in realtime") @@ -612,7 +632,8 @@ async def loop(self): # noqa: C901 else: if result is True: # We got kicked so lets figure out the elapsed pseudo time - delta = (now - self.last_fired).total_seconds() * self.AD.timewarp + delta = (now - self.last_fired).total_seconds() * \ + self.AD.timewarp else: if len(next_entries) > 0: @@ -639,7 +660,8 @@ async def loop(self): # noqa: C901 if old_dst_offset != dst_offset: # # DST began or ended, lets prove we noticed - self.logger.info("Daylight Savings Time transition detected") + self.logger.info( + "Daylight Savings Time transition detected") # # Re calculate next entries # @@ -674,7 +696,8 @@ async def loop(self): # noqa: C901 next_entries = self.get_next_entries() self.logger.debug("Next entries: %s", next_entries) if len(next_entries) > 0: - delay = (next_entries[0]["timestamp"] - self.now).total_seconds() + delay = (next_entries[0]["timestamp"] - + self.now).total_seconds() else: # Nothing to do, lets wait for a while, we will get woken up if anything new comes along delay = idle_time @@ -756,7 +779,8 @@ async def info_timer(self, handle, name) -> tuple[datetime, int, dict] | None: return ( self.make_naive(callback["timestamp"]), callback["interval"], - self.sanitize_timer_kwargs(self.AD.app_management.objects[name].object, callback["kwargs"]), + self.sanitize_timer_kwargs( + self.AD.app_management.objects[name].object, callback["kwargs"]), ) async def get_scheduler_entries(self): @@ -769,21 +793,28 @@ async def get_scheduler_entries(self): ): schedule[name][str(entry)] = {} schedule[name][str(entry)]["timestamp"] = str( - self.AD.sched.make_naive(self.schedule[name][entry]["timestamp"]) + self.AD.sched.make_naive( + self.schedule[name][entry]["timestamp"]) ) - schedule[name][str(entry)]["type"] = self.schedule[name][entry]["type"] - schedule[name][str(entry)]["name"] = self.schedule[name][entry]["name"] + schedule[name][str( + entry)]["type"] = self.schedule[name][entry]["type"] + schedule[name][str( + entry)]["name"] = self.schedule[name][entry]["name"] schedule[name][str(entry)]["basetime"] = str( - self.AD.sched.make_naive(self.schedule[name][entry]["basetime"]) + self.AD.sched.make_naive( + self.schedule[name][entry]["basetime"]) ) - schedule[name][str(entry)]["repeat"] = self.schedule[name][entry]["repeat"] + schedule[name][str( + entry)]["repeat"] = self.schedule[name][entry]["repeat"] if self.schedule[name][entry]["type"] == "next_rising": schedule[name][str(entry)]["interval"] = "sunrise:{}".format( - utils.format_seconds(self.schedule[name][entry]["offset"]) + utils.format_seconds( + self.schedule[name][entry]["offset"]) ) elif self.schedule[name][entry]["type"] == "next_setting": schedule[name][str(entry)]["interval"] = "sunset:{}".format( - utils.format_seconds(self.schedule[name][entry]["offset"]) + utils.format_seconds( + self.schedule[name][entry]["offset"]) ) elif self.schedule[name][entry]["repeat"] is True: schedule[name][str(entry)]["interval"] = utils.format_seconds( @@ -792,14 +823,18 @@ async def get_scheduler_entries(self): else: schedule[name][str(entry)]["interval"] = "None" - schedule[name][str(entry)]["offset"] = self.schedule[name][entry]["offset"] + schedule[name][str( + entry)]["offset"] = self.schedule[name][entry]["offset"] schedule[name][str(entry)]["kwargs"] = "" for kwarg in self.schedule[name][entry]["kwargs"]: - schedule[name][str(entry)]["kwargs"] = utils.get_kwargs(self.schedule[name][entry]["kwargs"]) + schedule[name][str(entry)]["kwargs"] = utils.get_kwargs( + self.schedule[name][entry]["kwargs"]) if isinstance(self.schedule[name][entry]["callback"], functools.partial): - schedule[name][str(entry)]["callback"] = self.schedule[name][entry]["callback"].func.__name__ + schedule[name][str( + entry)]["callback"] = self.schedule[name][entry]["callback"].func.__name__ else: - schedule[name][str(entry)]["callback"] = self.schedule[name][entry]["callback"].__name__ + schedule[name][str( + entry)]["callback"] = self.schedule[name][entry]["callback"].__name__ schedule[name][str(entry)]["pin_thread"] = ( self.schedule[name][entry]["pin_thread"] if self.schedule[name][entry]["pin_thread"] != -1 @@ -811,7 +846,8 @@ async def get_scheduler_entries(self): # Order it - ordered_schedule = OrderedDict(sorted(schedule.items(), key=lambda x: x[0])) + ordered_schedule = OrderedDict( + sorted(schedule.items(), key=lambda x: x[0])) return ordered_schedule @@ -840,6 +876,15 @@ async def get_now_ts(self) -> float: async def get_now_naive(self): return self.make_naive(await self.get_now()) + async def get_dt_from_param(self, time, name, today, days_offset): + if isinstance(time, str): + return (await self._parse_time(time, name, today=today, days_offset=days_offset)) + + elif isinstance(time, datetime): + return time + else: + raise ValueError("Unknown type in now_is_between()") + async def now_is_between( self, start_time: str | datetime, @@ -848,9 +893,9 @@ async def now_is_between( now: str | None = None ): if isinstance(start_time, str): - start_time = (await self._parse_time(start_time, name, today=True, days_offset=0))["datetime"] + start_time_dt = (await self.get_dt_from_param(start_time, name, today=True, days_offset=0))["datetime"] if isinstance(end_time, str): - end_time = (await self._parse_time(end_time, name, today=True, days_offset=0))["datetime"] + end_time_dt = (await self.get_dt_from_param(end_time, name, today=True, days_offset=0))["datetime"] if now is not None: now = (await self._parse_time(now, name))["datetime"] @@ -862,28 +907,28 @@ async def now_is_between( # ) # Comparisons - if end_time < start_time: + if end_time_dt < start_time_dt: # Start and end time backwards. # Spans midnight # Lets start by assuming end_time is wrong and should be tomorrow # This will be true if we are currently after start_time - end_time = (await self._parse_time(end_time, name, today=True, days_offset=1))["datetime"] + end_time_dt = (await self.get_dt_from_param(end_time, name, today=True, days_offset=1))["datetime"] # self.logger.info( # f"\nMidnight transition detected\nstart = {start_time}\nnow = {now}\nend = {end_time}\n" + "-" * 80 # ) - if now < start_time and now < end_time: + if now < start_time_dt and now < end_time_dt: # Well, it's complicated - # We crossed into a new day and things changed. # Now all times have shifted relative to the new day, so we need to look at it differently # If both times are now in the future, we now actually want to set start time back a day and keep end_time as today - start_time = (await self._parse_time(start_time, name, today=True, days_offset=-1))["datetime"] - end_time = (await self._parse_time(end_time, name, today=True, days_offset=0))["datetime"] + start_time_dt = (await self.get_dt_from_param(start_time, name, today=True, days_offset=-1))["datetime"] + end_time_dt = (await self.get_dt_from_param(end_time, name, today=True, days_offset=0))["datetime"] # self.logger.info(f"\nReverse\nstart = {start_time}\nnow = {now}\nend = {end_time}\n" + "=" * 80) # self.logger.info(f"\nFinal\nstart = {start_time}\nnow = {now}\nend = {end_time}\n" + "-" * 80) # self.logger.info(f"Final decision: {start_time <= now <= end_time}\n" + "=" * 80) - return start_time <= now <= end_time + return start_time_dt <= now <= end_time_dt async def sunset(self, aware: bool = True, today: bool = False, days_offset: int = 0) -> datetime: if today: @@ -952,19 +997,23 @@ async def _parse_time( # parse time with date if match := DATE_REGEX.match(time_str): - kwargs = {k: int(v) for k, v in match.groupdict().items() if v is not None} + kwargs = {k: int(v) + for k, v in match.groupdict().items() if v is not None} if "microsecond" in kwargs: - kwargs["microsecond"] = int(float(f"0.{kwargs['microsecond']}") * 10**6) + kwargs["microsecond"] = int( + float(f"0.{kwargs['microsecond']}") * 10**6) dt = datetime(**kwargs) + timedelta(days=days_offset) # parse time based on time only (date will be today) elif match := TIME_REGEX.match(time_str): - kwargs = {k: int(v) for k, v in match.groupdict().items() if v is not None} + kwargs = {k: int(v) + for k, v in match.groupdict().items() if v is not None} if "microsecond" in kwargs: - kwargs["microsecond"] = int(float(f"0.{kwargs['microsecond']}") * 10**6) + kwargs["microsecond"] = int( + float(f"0.{kwargs['microsecond']}") * 10**6) today = (await self.get_now()).date() dt = datetime.combine(today, time(**kwargs)) + timedelta( @@ -984,10 +1033,12 @@ async def _parse_time( case _: raise ValueError(f"Invalid sun event: {sun}") - kwargs = {k: int(v) for k, v in match_dict.items() if v is not None} + kwargs = {k: int(v) + for k, v in match_dict.items() if v is not None} if "microsecond" in kwargs: - kwargs["microsecond"] = int(float(f"0.{kwargs['microsecond']}") * 10**6) + kwargs["microsecond"] = int( + float(f"0.{kwargs['microsecond']}") * 10**6) td = timedelta(**kwargs) offset = td.total_seconds() @@ -1043,9 +1094,11 @@ async def dump_schedule(self): if self.schedule == {}: self.diag.info("Scheduler Table is empty") else: - self.diag.info("--------------------------------------------------") + self.diag.info( + "--------------------------------------------------") self.diag.info("Scheduler Table") - self.diag.info("--------------------------------------------------") + self.diag.info( + "--------------------------------------------------") for name in self.schedule.keys(): self.diag.info("%s:", name) for entry in sorted( @@ -1054,10 +1107,12 @@ async def dump_schedule(self): ): self.diag.info( " Next Event Time: %s - data: %s", - self.make_naive(self.schedule[name][entry]["timestamp"]), + self.make_naive( + self.schedule[name][entry]["timestamp"]), self.schedule[name][entry], ) - self.diag.info("--------------------------------------------------") + self.diag.info( + "--------------------------------------------------") # # Utilities @@ -1068,7 +1123,8 @@ def sanitize_timer_kwargs(app: "ADBase", kwargs: dict) -> dict: kwargs_copy = kwargs.copy() return utils._sanitize_kwargs( kwargs_copy, - ["interval", "constrain_days", "constrain_input_boolean", "_pin_app", "_pin_thread", "__silent"] + ["interval", "constrain_days", "constrain_input_boolean", + "_pin_app", "_pin_thread", "__silent"] + app.constraints, )