Skip to content

Commit

Permalink
better logic for health checks and restarting missing processes
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodlz committed Jan 12, 2024
1 parent 3934ded commit 7a0bbee
Showing 1 changed file with 35 additions and 24 deletions.
59 changes: 35 additions & 24 deletions kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,9 +865,40 @@ def watchdog(obs_date: str = None, test: bool = False):
]
log(f"Topics: {topics_tonight}")

processes_per_topic = (
config["kafka"].get("processes_per_topic", {}).get("ZTF", 1)
)

for t in topics_tonight:
if t not in topics_on_watch:
log(f"Starting listener thread for {t}")
if t in topics_on_watch and len(topics_on_watch[t]) > 0:
log(f"Performing thread health check for {t}")
try:
for i in range(len(topics_on_watch[t])):
if not topics_on_watch[t][i].is_alive():
log(f"Thread {i} topic {t} died, removing")
topics_on_watch[t].pop(i)
else:
log(f"Thread {i} topic {t} appears normal")
if len(topics_on_watch[t]) == 0:
log(f"Topic {t} has no threads left, removing")
topics_on_watch.pop(t)
except Exception as _e:
log(f"Failed to perform health check: {_e}")
pass

if (
t not in topics_on_watch
or len(topics_on_watch[t]) < processes_per_topic
):
current_processes = 0
missing_processes = processes_per_topic
if t not in topics_on_watch:
log(f"Starting listener thread(s) for {t}")
topics_on_watch[t] = []
else:
log(f"Restarting missing listener thread(s) for {t}")
current_processes = len(topics_on_watch[t])
missing_processes -= len(topics_on_watch[t])
offset_reset = config["kafka"]["default.topic.config"][
"auto.offset.reset"
]
Expand All @@ -877,41 +908,21 @@ def watchdog(obs_date: str = None, test: bool = False):
bootstrap_servers = config["kafka"]["bootstrap.test.servers"]
group = config["kafka"]["group"]

processes_per_topic = (
config["kafka"].get("processes_per_topic", {}).get("ZTF", 1)
)
topics_on_watch[t] = []
for _ in range(processes_per_topic):
for _ in range(missing_processes):
topics_on_watch[t].append(
multiprocessing.Process(
target=topic_listener,
args=(t, bootstrap_servers, offset_reset, group, test),
)
)

for i in range(processes_per_topic):
for i in range(current_processes, len(topics_on_watch[t])):
topics_on_watch[t][i].daemon = True
log(
f"set daemon to true for thread {i} topic {topics_on_watch}"
)
topics_on_watch[t][i].start()

else:
log(f"Performing thread health check for {t}")
try:
for i in range(len(topics_on_watch[t])):
if not topics_on_watch[t][i].is_alive():
log(f"Thread {i} topic {t} died, removing")
topics_on_watch[t].pop(i)
else:
log(f"Thread {i} topic {t} appears normal")
if len(topics_on_watch[t]) == 0:
log(f"Topic {t} has no threads left, removing")
topics_on_watch.pop(t)
except Exception as _e:
log(f"Failed to perform health check: {_e}")
pass

if test:
time.sleep(120)
# when testing, wait for topic listeners to pull all the data, then break
Expand Down

0 comments on commit 7a0bbee

Please sign in to comment.