diff --git a/config.defaults.yaml b/config.defaults.yaml index 88b3441f..12b96dc2 100644 --- a/config.defaults.yaml +++ b/config.defaults.yaml @@ -39,6 +39,8 @@ kowalski: bootstrap.test.servers: "localhost:9092" zookeeper.test: "localhost:2181" path: "kafka_2.13-3.4.1" + processes_per_topic: + ZTF: 2 database: max_pool_size: 200 diff --git a/kowalski/alert_brokers/alert_broker_ztf.py b/kowalski/alert_brokers/alert_broker_ztf.py index e25097fa..465aa4ee 100644 --- a/kowalski/alert_brokers/alert_broker_ztf.py +++ b/kowalski/alert_brokers/alert_broker_ztf.py @@ -317,8 +317,6 @@ def make_filter_templates(self, active_filters: Sequence): self.verbose > 1, ): response = self.api_skyportal_get_group(active_filter["group_id"]) - if self.verbose > 1: - log(response.json()) if response.json()["status"] == "success": group_name = ( response.json()["data"]["nickname"] @@ -861,14 +859,46 @@ def watchdog(obs_date: str = None, test: bool = False): for t in topics if (datestr in t) and ("programid" in t) + and ("ztf" in t) and ("zuds" not in t) and ("pgir" not in t) ] log(f"Topics: {topics_tonight}") + processes_per_topic = ( + config["kafka"].get("processes_per_topic", {}).get("ZTF", 1) + ) + for t in topics_tonight: - if t not in topics_on_watch: - log(f"Starting listener thread for {t}") + if t in topics_on_watch and len(topics_on_watch[t]) > 0: + log(f"Performing thread health check for {t}") + try: + for i in range(len(topics_on_watch[t])): + if not topics_on_watch[t][i].is_alive(): + log(f"Thread {i} topic {t} died, removing") + topics_on_watch[t].pop(i) + else: + log(f"Thread {i} topic {t} appears normal") + if len(topics_on_watch[t]) == 0: + log(f"Topic {t} has no threads left, removing") + topics_on_watch.pop(t) + except Exception as _e: + log(f"Failed to perform health check: {_e}") + pass + + if ( + t not in topics_on_watch + or len(topics_on_watch[t]) < processes_per_topic + ): + current_processes = 0 + missing_processes = processes_per_topic + if t not in topics_on_watch: + log(f"Starting listener thread(s) for {t}") + topics_on_watch[t] = [] + else: + log(f"Restarting missing listener thread(s) for {t}") + current_processes = len(topics_on_watch[t]) + missing_processes -= len(topics_on_watch[t]) offset_reset = config["kafka"]["default.topic.config"][ "auto.offset.reset" ] @@ -878,32 +908,28 @@ def watchdog(obs_date: str = None, test: bool = False): bootstrap_servers = config["kafka"]["bootstrap.test.servers"] group = config["kafka"]["group"] - topics_on_watch[t] = multiprocessing.Process( - target=topic_listener, - args=(t, bootstrap_servers, offset_reset, group, test), - ) - topics_on_watch[t].daemon = True - log(f"set daemon to true {topics_on_watch}") - topics_on_watch[t].start() + for _ in range(missing_processes): + topics_on_watch[t].append( + multiprocessing.Process( + target=topic_listener, + args=(t, bootstrap_servers, offset_reset, group, test), + ) + ) - else: - log(f"Performing thread health check for {t}") - try: - if not topics_on_watch[t].is_alive(): - log(f"Thread {t} died, removing") - # topics_on_watch[t].terminate() - topics_on_watch.pop(t, None) - else: - log(f"Thread {t} appears normal") - except Exception as _e: - log(f"Failed to perform health check: {_e}") - pass + for i in range(current_processes, len(topics_on_watch[t])): + topics_on_watch[t][i].daemon = True + log( + f"set daemon to true for thread {i} topic {topics_on_watch}" + ) + topics_on_watch[t][i].start() if test: time.sleep(120) # when testing, wait for topic listeners to pull all the data, then break for t in topics_on_watch: - topics_on_watch[t].kill() + for i in range(len(topics_on_watch[t])): + topics_on_watch[t][i].kill() + log(f"Test mode: Killed thread {i} - topic {t} after 120s") break except Exception as e: