Skip to content

Commit

Permalink
Merge pull request #921 from AI4Bharat/exportVOTR
Browse files Browse the repository at this point in the history
minor fixes in task queue status
  • Loading branch information
aparna-aa authored Oct 15, 2024
2 parents dc1ab07 + bcd9d81 commit 7e8fb25
Showing 1 changed file with 29 additions and 37 deletions.
66 changes: 29 additions & 37 deletions backend/task/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3194,7 +3194,14 @@ def inspect_queue(self, request):
elif elem["name"] == "task.tasks.celery_nmt_call":
task_obj["task_id"] = eval(elem["kwargs"])["task_id"]
elif elem["name"] == "task.tasks.celery_nmt_tts_call":
task_obj["task_id"] = eval(elem["kwargs"])["task_id"] or eval(elem["args"])[0]
try:
task_obj["task_id"] = eval(elem["kwargs"])["task_id"]
except:
task_obj["task_id"] = eval(elem["args"].split(",")[0].split("(")[1])
elif elem["name"] == "voiceover.tasks.celery_integration":
task_obj["task_id"] = eval(elem["args"].split(",")[2])
elif elem["name"] == "voiceover.tasks.export_voiceover_async":
task_obj["task_id"] = eval(elem["args"].split(",")[0].split("(")[1])
else:
task_obj["task_id"] = ""

Expand Down Expand Up @@ -3231,22 +3238,18 @@ def inspect_queue(self, request):
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
else:
if queue == "nmt":
queue_type = "task.tasks.celery_nmt_call"
elif queue == "tts":
queue_type = "task.tasks.celery_tts_call"
elif queue == "nmt_tts":
queue_type = "task.tasks.celery_nmt_tts_call"
if queue == "nmt" or queue == "nmt_tts":
queue_type = "celery@nmt_worker"
else:
queue_type = "task.tasks.celery_asr_call"
queue_type = "celery@asr_tts_worker"

try:
task_list = []
status_list = []
url = f"{flower_url}/api/tasks"
params = {
"state": "STARTED",
"sort_by": "received",
"name": queue_type,
"sort_by": "-received",
"workername": queue_type,
}
if flower_username and flower_password:
res = requests.get(
Expand All @@ -3259,42 +3262,27 @@ def inspect_queue(self, request):
for elem in task_data:
if queue == "asr" and elem["name"] == "task.tasks.celery_asr_call":
task_list.append(eval(elem["kwargs"])["task_id"])
status_list.append(elem["state"])
elif (
queue == "tts" and elem["name"] == "task.tasks.celery_tts_call"
):
# task_list.append(eval(elem["kwargs"])["task_id"])
task_list.append(eval(elem["args"].split(",")[0].split("(")[1]))
status_list.append(elem["state"])
elif (
queue == "nmt" and elem["name"] == "task.tasks.celery_nmt_call"
):
task_list.append(eval(elem["kwargs"])["task_id"])
else:
pass
params = {
"state": "RECEIVED",
"sort_by": "received",
"workername": queue_type,
}
if flower_username and flower_password:
res = requests.get(
url, params=params, auth=(flower_username, flower_password)
)
else:
res = requests.get(url, params=params)
data = res.json()
task_data = list(data.values())
for elem in task_data:
if queue == "asr" and elem["name"] == "task.tasks.celery_asr_call":
task_list.append(eval(elem["kwargs"])["task_id"])
elif (
queue == "tts" and elem["name"] == "task.tasks.celery_tts_call"
):
# task_list.append(eval(elem["kwargs"])["task_id"])
task_list.append(eval(elem["args"].split(",")[0].split("(")[1]))
status_list.append(elem["state"])
elif (
queue == "nmt" and elem["name"] == "task.tasks.celery_nmt_call"
queue == "nmt_tts" and elem["name"] == "task.tasks.celery_nmt_tts_call"
):
task_list.append(eval(elem["kwargs"])["task_id"])
try:
task_list.append(eval(elem["kwargs"])["task_id"])
status_list.append(elem["state"])
except:
task_list.append(eval(elem["args"].split(",")[0].split("(")[1]))
status_list.append(elem["state"])
else:
pass
if task_list:
Expand All @@ -3318,8 +3306,12 @@ def inspect_queue(self, request):
"video_duration": str(elem["video__duration"]),
}
i = task_list.index(elem["id"])
task_dict["status"] = status_list[i]
task_list[i] = task_dict

for i in task_list:
if type(i) == int:
j = task_list.index(i)
task_list[j] = {"task_id": i, "status": "Not Found"}
return Response(
{"message": "successful", "data": task_list},
status=status.HTTP_200_OK,
Expand Down

0 comments on commit 7e8fb25

Please sign in to comment.