Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into feature_template_…
Browse files Browse the repository at this point in the history
…market_master
  • Loading branch information
normal-wls committed Dec 27, 2024
2 parents 8090bd0 + dd2f303 commit 03322e4
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 29 deletions.
2 changes: 1 addition & 1 deletion app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ is_use_celery: True
author: 蓝鲸智云
introduction: 标准运维是通过一套成熟稳定的任务调度引擎,把在多系统间的工作整合到一个流程,助力运维实现跨系统调度自动化的SaaS应用。
introduction_en: SOPS is a SaaS application that utilizes a set of mature and stable task scheduling engines to help realize cross-system scheduling automation, and integrates the work among multiple systems into a single process.
version: 3.32.1-p2
version: 3.32.1-p3
category: 运维工具
language_support: 中文
desktop:
Expand Down
2 changes: 1 addition & 1 deletion app_desc.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
spec_version: 2
app_version: "3.32.1-p2"
app_version: "3.32.1-p3"
app:
region: default
bk_app_code: bk_sops
Expand Down
2 changes: 1 addition & 1 deletion config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@
# mako模板中:<script src="/a.js?v=${ STATIC_VERSION }"></script>
# 如果静态资源修改了以后,上线前改这个版本号即可

STATIC_VERSION = "3.32.1-p2"
STATIC_VERSION = "3.32.1-p3"
DEPLOY_DATETIME = datetime.datetime.now().strftime("%Y%m%d%H%M%S")

STATICFILES_DIRS = [os.path.join(BASE_DIR, "static")]
Expand Down
5 changes: 3 additions & 2 deletions frontend/desktop/src/pages/task/TaskExecute/ExecuteInfo.vue
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@
},
// 补充记录缺少的字段
async setFillRecordField (record) {
const { version, component_code: componentCode } = this.nodeDetailConfig
const { version, component_code: componentCode, componentData = {} } = this.nodeDetailConfig
const { inputs, state } = record
let outputs = record.outputs
// 执行记录的outputs可能为Object格式,需要转为Array格式
Expand Down Expand Up @@ -721,7 +721,8 @@
const keys = Object.keys(inputs)
this.renderConfig = renderConfig.filter(item => keys.includes(item.tag_code))
} else if (componentCode) { // 任务节点需要加载标准插件
await this.getNodeConfig(componentCode, version, inputs.plugin_version)
const pluginVersion = componentData.plugin_version?.value
await this.getNodeConfig(componentCode, version, pluginVersion)
}
inputsInfo = Object.keys(inputs).reduce((acc, cur) => {
const scheme = Array.isArray(this.renderConfig) ? this.renderConfig.find(item => item.tag_code === cur) : null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,9 +658,10 @@
},
// 变量类型切换
onValTypeChange (val, oldValue) {
// 将上一个类型的填写的数据存起来("集群模块IP选择器"的code与"ip选择器"code相同,需要单独处理)
const valData = oldValue === 'set_module_ip_selector'
? { set_module_ip_selector: tools.deepClone(this.renderData['ip_selector']) }
// 将上一个类型的填写的数据存起来("集群模块IP选择器"和"GSEKit IP选择器"的code与"ip选择器"code相同,需要单独处理)
const sameIpSelectorCode = ['set_module_ip_selector', 'gse_kit_ip_selector']
const valData = sameIpSelectorCode.includes(oldValue)
? { [oldValue]: tools.deepClone(this.renderData['ip_selector']) }
: tools.deepClone(this.renderData)
Object.assign(this.varTypeData, valData)
// 将input textarea类型正则存起来
Expand All @@ -677,7 +678,7 @@
})
if (val in this.varTypeData) {
const value = this.varTypeData[val]
this.renderData = { [val === 'set_module_ip_selector' ? 'ip_selector' : val]: value }
this.renderData = { [sameIpSelectorCode.includes(val) ? 'ip_selector' : val]: value }
} else {
this.renderData = {}
}
Expand Down
9 changes: 8 additions & 1 deletion gcloud/apigw/views/create_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def create_task(request, template_id, project_id):
"code": err_code.REQUEST_PARAM_INVALID.code,
"message": f"callback_url format error, must match {CALLBACK_URL_PATTERN}",
}
callback_version = params.get("callback_version", None)

# 兼容老版本的接口调用
if template_source in NON_COMMON_TEMPLATE_TYPES:
Expand Down Expand Up @@ -214,7 +215,13 @@ def create_task(request, template_id, project_id):

# create callback url record
if callback_url:
TaskCallBackRecord.objects.create(task_id=task.id, url=callback_url)
record_kwargs = {
"task_id": task.id,
"url": callback_url,
}
if callback_version:
record_kwargs["extra_info"] = json.dumps({"callback_version": callback_version})
TaskCallBackRecord.objects.create(**record_kwargs)

# crete auto retry strategy
arn_creator = AutoRetryNodeStrategyCreator(taskflow_id=task.id, root_pipeline_id=task.pipeline_instance.instance_id)
Expand Down
9 changes: 7 additions & 2 deletions gcloud/taskflow3/domains/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class TaskCallBacker:
def __init__(self, task_id, *args, **kwargs):
self.task_id = task_id
self.record = TaskCallBackRecord.objects.filter(task_id=self.task_id).first()
self.extra_info = {"task_id": self.task_id, **json.loads(self.record.extra_info), **kwargs}
self.record_extra_info = json.loads(self.record.extra_info)
self.extra_info = {"task_id": self.task_id, **self.record_extra_info, **kwargs}

def check_record_existence(self):
return True if self.record else False
Expand Down Expand Up @@ -96,9 +97,13 @@ def _url_callback(self):
logger.error(f"[TaskCallBacker _url_callback] get lock error: {err}")
return None
url = self.record.url
callback_version = self.record_extra_info.get("callback_version")
response = None
try:
response = requests.post(url, data=self.extra_info)
if callback_version == TaskCallBackRecord.CALLBACK_VERSION_V2:
response = requests.post(url, json=self.extra_info)
else:
response = requests.post(url, data=self.extra_info)
response.raise_for_status()
except HTTPError as e:
message = (
Expand Down
2 changes: 2 additions & 0 deletions gcloud/taskflow3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,8 @@ class Meta:


class TaskCallBackRecord(models.Model):
CALLBACK_VERSION_V2 = "v2"

id = models.BigAutoField(verbose_name="ID", primary_key=True)
task_id = models.BigIntegerField(verbose_name=_("任务ID"), db_index=True)
url = models.TextField(verbose_name=_("回调地址"))
Expand Down
41 changes: 24 additions & 17 deletions gcloud/taskflow3/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,23 @@


def _finish_taskflow_and_send_signal(instance_id, sig, task_success=False):
qs = TaskFlowInstance.objects.filter(pipeline_instance__instance_id=instance_id).only("id")
if not qs:
task = TaskFlowInstance.objects.filter(pipeline_instance__instance_id=instance_id).first()
if not task:
logger.error("pipeline archive handler get taskflow error, pipeline_instance_id={}".format(instance_id))
return

task_id = qs[0].id

TaskFlowInstance.objects.filter(id=task_id).update(current_flow="finished")
sig.send(TaskFlowInstance, task_id=task_id)
TaskFlowInstance.objects.filter(id=task.id).update(current_flow="finished")
sig.send(TaskFlowInstance, task_id=task.id)

if task_success:
_check_and_callback(task_id, task_success=task_success, task=qs[0])
_check_and_callback(task, task_success=task_success)
try:
send_taskflow_message.delay(task_id=task_id, msg_type=TASK_FINISHED)
send_taskflow_message.delay(task_id=task.id, msg_type=TASK_FINISHED)
except Exception as e:
logger.exception("send_taskflow_message[taskflow_id=%s] task delay error: %s" % (task_id, e))
logger.exception("send_taskflow_message[taskflow_id=%s] task delay error: %s" % (task.id, e))

if sig is taskflow_revoked:
_check_and_callback(task_id, task_success=False, task=qs[0])
_check_and_callback(task, task_success=False)


def _send_node_fail_message(node_id, pipeline_id):
Expand All @@ -74,7 +72,7 @@ def _send_node_fail_message(node_id, pipeline_id):
except TaskFlowInstance.DoesNotExist:
logger.error("pipeline finished handler get taskflow error, pipeline_instance_id=%s" % pipeline_id)
return
_check_and_callback(taskflow.id, task_success=False, task=taskflow)
_check_and_callback(taskflow, task_success=False)

if taskflow.is_child_taskflow is False:
try:
Expand All @@ -85,15 +83,24 @@ def _send_node_fail_message(node_id, pipeline_id):
logger.exception("pipeline_fail_handler[taskflow_id=%s] task delay error: %s" % (taskflow.id, e))


def _check_and_callback(taskflow_id, *args, **kwargs):
if not TaskCallBackRecord.objects.filter(task_id=taskflow_id).exists():
def _check_and_callback(task, *args, **kwargs):
record = TaskCallBackRecord.objects.filter(task_id=task.id).first()
if not record:
return
try:
if kwargs.get("task"):
task = kwargs.pop("task")
kwargs["task_outputs"] = json.dumps(task.get_task_detail()["outputs"])
if (
record.url
and json.loads(record.extra_info).get("callback_version") == TaskCallBackRecord.CALLBACK_VERSION_V2
):
# 检查任务的输出是否可以被json序列化,如果可以则将输出作为参数传给回调函数,否则不做处理
try:
task_outputs = task.get_task_detail()["outputs"]
json.dumps(task_outputs)
kwargs["task_outputs"] = task_outputs
except Exception as e:
logger.exception(f"[task {task.id}] outputs data serialize error: {e}")
task_callback.apply_async(
kwargs=dict(task_id=taskflow_id, **kwargs),
kwargs=dict(task_id=task.id, **kwargs),
queue="task_callback",
routing_key="task_callback",
)
Expand Down

0 comments on commit 03322e4

Please sign in to comment.