Skip to content

Commit

Permalink
bugfix: 修复节点状态联调问题
Browse files Browse the repository at this point in the history
  • Loading branch information
hanshuaikang committed Dec 4, 2023
1 parent e0312da commit 5a4d9ab
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
13 changes: 9 additions & 4 deletions gcloud/core/apis/drf/viewsets/taskflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,24 @@ def _filter_pipeline_pause(self):
获取所有暂停的任务,当任务暂停时,pipeline 的状态会变成暂停,
return:
"""
return self.queryset.filter(
pipeline_instance_id__in=self._fetch_pipeline_instance_ids(statuses=[states.SUSPENDED], by_root=False)
)

pause_pipeline_instance_ids = set(
self._fetch_pipeline_instance_ids(statuses=[states.SUSPENDED], by_root=True)
) - set(self._fetch_pipeline_instance_ids(statuses=[states.FAILED]))
return self.queryset.filter(pipeline_instance_id__in=pause_pipeline_instance_ids)

def _filter_running(self):
"""
正在运行的流程等于未完成的任务 -(暂停 + 失败的任务)
@return:
"""
return self.queryset.exclude(

running_task_queryset = self.queryset.exclude(
pipeline_instance_id__in=self._fetch_pipeline_instance_ids(statuses=[states.FAILED, states.SUSPENDED])
)
pending_process_taskflow_ids: typing.List[int] = self._fetch_pending_process_taskflow_ids(running_task_queryset)
return running_task_queryset.exclude(id__in=pending_process_taskflow_ids)

def _filter_pending_process(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions gcloud/taskflow3/domains/dispatchers/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ def get_subprocess_status(task_status: dict, subprocess_id: str) -> dict:
] = fetch_node_id__auto_retry_info_map(root_pipeline_id, extract_nodes_by_statuses(status_tree))

self.format_bamboo_engine_status(
task_status, node_ids_gby_code, code__status_map, node_id__auto_retry_info, is_subquery
task_status, node_ids_gby_code, code__status_map, node_id__auto_retry_info, False, is_subquery
)
else:
format_bamboo_engine_status_legacy(task_status)
Expand Down Expand Up @@ -793,7 +793,7 @@ def format_bamboo_engine_status(
if AutoRetryNodeStrategy.objects.filter(root_pipeline_id=status_tree["id"]).exists():
self.handle_subprocess_node_status(status_tree)
else:
if is_subquery or is_child:
if is_subquery:
# 只有在「子查询」(独立子流程)或子递归(非独立子流程)的情况下,才需要校验
auto_retry_info: typing.Optional[typing.Dict[str, typing.Any]] = node_id__auto_retry_info.get(
status_tree["id"]
Expand Down

0 comments on commit 5a4d9ab

Please sign in to comment.