Skip to content

Commit

Permalink
Parallelize topic reads - consumer groups (#267)
Browse files Browse the repository at this point in the history
* multiple consumers per topic, configurable, ZTF-only for now
  • Loading branch information
Theodlz authored Jan 12, 2024
1 parent 8c3f89f commit 54a22d6
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 24 deletions.
2 changes: 2 additions & 0 deletions config.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ kowalski:
bootstrap.test.servers: "localhost:9092"
zookeeper.test: "localhost:2181"
path: "kafka_2.13-3.4.1"
processes_per_topic:
ZTF: 2

database:
max_pool_size: 200
Expand Down
74 changes: 50 additions & 24 deletions kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,6 @@ def make_filter_templates(self, active_filters: Sequence):
self.verbose > 1,
):
response = self.api_skyportal_get_group(active_filter["group_id"])
if self.verbose > 1:
log(response.json())
if response.json()["status"] == "success":
group_name = (
response.json()["data"]["nickname"]
Expand Down Expand Up @@ -861,14 +859,46 @@ def watchdog(obs_date: str = None, test: bool = False):
for t in topics
if (datestr in t)
and ("programid" in t)
and ("ztf" in t)
and ("zuds" not in t)
and ("pgir" not in t)
]
log(f"Topics: {topics_tonight}")

processes_per_topic = (
config["kafka"].get("processes_per_topic", {}).get("ZTF", 1)
)

for t in topics_tonight:
if t not in topics_on_watch:
log(f"Starting listener thread for {t}")
if t in topics_on_watch and len(topics_on_watch[t]) > 0:
log(f"Performing thread health check for {t}")
try:
for i in range(len(topics_on_watch[t])):
if not topics_on_watch[t][i].is_alive():
log(f"Thread {i} topic {t} died, removing")
topics_on_watch[t].pop(i)
else:
log(f"Thread {i} topic {t} appears normal")
if len(topics_on_watch[t]) == 0:
log(f"Topic {t} has no threads left, removing")
topics_on_watch.pop(t)
except Exception as _e:
log(f"Failed to perform health check: {_e}")
pass

if (
t not in topics_on_watch
or len(topics_on_watch[t]) < processes_per_topic
):
current_processes = 0
missing_processes = processes_per_topic
if t not in topics_on_watch:
log(f"Starting listener thread(s) for {t}")
topics_on_watch[t] = []
else:
log(f"Restarting missing listener thread(s) for {t}")
current_processes = len(topics_on_watch[t])
missing_processes -= len(topics_on_watch[t])
offset_reset = config["kafka"]["default.topic.config"][
"auto.offset.reset"
]
Expand All @@ -878,32 +908,28 @@ def watchdog(obs_date: str = None, test: bool = False):
bootstrap_servers = config["kafka"]["bootstrap.test.servers"]
group = config["kafka"]["group"]

topics_on_watch[t] = multiprocessing.Process(
target=topic_listener,
args=(t, bootstrap_servers, offset_reset, group, test),
)
topics_on_watch[t].daemon = True
log(f"set daemon to true {topics_on_watch}")
topics_on_watch[t].start()
for _ in range(missing_processes):
topics_on_watch[t].append(
multiprocessing.Process(
target=topic_listener,
args=(t, bootstrap_servers, offset_reset, group, test),
)
)

else:
log(f"Performing thread health check for {t}")
try:
if not topics_on_watch[t].is_alive():
log(f"Thread {t} died, removing")
# topics_on_watch[t].terminate()
topics_on_watch.pop(t, None)
else:
log(f"Thread {t} appears normal")
except Exception as _e:
log(f"Failed to perform health check: {_e}")
pass
for i in range(current_processes, len(topics_on_watch[t])):
topics_on_watch[t][i].daemon = True
log(
f"set daemon to true for thread {i} topic {topics_on_watch}"
)
topics_on_watch[t][i].start()

if test:
time.sleep(120)
# when testing, wait for topic listeners to pull all the data, then break
for t in topics_on_watch:
topics_on_watch[t].kill()
for i in range(len(topics_on_watch[t])):
topics_on_watch[t][i].kill()
log(f"Test mode: Killed thread {i} - topic {t} after 120s")
break

except Exception as e:
Expand Down

0 comments on commit 54a22d6

Please sign in to comment.