Skip to content

Commit

Permalink
feature: 支持第三方插件回调
Browse files Browse the repository at this point in the history
  • Loading branch information
normal-wls committed Nov 29, 2023
1 parent 8333b5f commit 7ca30fa
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions pipeline_plugins/components/collections/remote_plugin/v1_0_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
"""
import logging

from django.utils.translation import ugettext_lazy as _
from pipeline.component_framework.component import Component
from pipeline.core.flow import Service, StaticIntervalGenerator
from pipeline.core.flow import AbstractIntervalGenerator, Service
from pipeline.core.flow.io import StringItemSchema

from pipeline_plugins.components.utils.sites.open.utils import get_node_callback_url
from plugin_service.conf import PLUGIN_LOGGER
from plugin_service.exceptions import PluginServiceException
from plugin_service.plugin_client import PluginServiceApiClient
from django.utils.translation import ugettext_lazy as _

logger = logging.getLogger(PLUGIN_LOGGER)

Expand All @@ -34,8 +36,19 @@ class State:
UNFINISHED_STATES = {State.POLL, State.CALLBACK}


class StepIntervalGenerator(AbstractIntervalGenerator):
def __init__(self):
super(StepIntervalGenerator, self).__init__()
self.fix_interval = None

def next(self):
super(StepIntervalGenerator, self).next()
# 最小 10s,最大 3600s 一次
return self.fix_interval or (10 if self.count < 30 else min((self.count - 25) ** 2, 3600))


class RemotePluginService(Service):
interval = StaticIntervalGenerator(5)
interval = StepIntervalGenerator()

def outputs_format(self):
return [
Expand Down Expand Up @@ -70,11 +83,23 @@ def execute(self, data, parent_data):
if key in parent_data.inputs
]
)

# 处理回调的情况
if detail_result["data"].get("enable_plugin_callback"):
logger.info("回调的节点,需要发送回调")
self.interval = None
plugin_context.update(
{
"plugin_callback_info": {
"url": get_node_callback_url(self.root_pipeline_id, self.id, getattr(self, "version", "")),
"data": {},
}
}
)

ok, result_data = plugin_client.invoke(plugin_version, {"inputs": data.inputs, "context": plugin_context})
if not ok:
message = _(
f"调用第三方插件invoke接口错误, 错误内容: {result_data['message']}, trace_id: {result_data.get('trace_id')}"
)
message = _(f"调用第三方插件invoke接口错误, 错误内容: {result_data['message']}, trace_id: {result_data.get('trace_id')}")
logger.error(message)
data.set_outputs("ex_data", message)
return False
Expand Down

0 comments on commit 7ca30fa

Please sign in to comment.