From fc4db39e8bcc69c50699611aaa52fe4295090e6c Mon Sep 17 00:00:00 2001 From: Thiago Castro Ferreira Date: Wed, 27 Sep 2023 14:35:03 +0000 Subject: [PATCH] Objectify pipeline output --- aixplain/factories/pipeline_output_factory.py | 176 ++++++++++++++++++ aixplain/modules/pipeline.py | 64 ++++--- aixplain/outputs/__init__.py | 20 ++ aixplain/outputs/pipelines/__init__.py | 36 ++++ aixplain/outputs/pipelines/input_info.py | 38 ++++ aixplain/outputs/pipelines/output_node.py | 35 ++++ aixplain/outputs/pipelines/output_segment.py | 41 ++++ 7 files changed, 386 insertions(+), 24 deletions(-) create mode 100644 aixplain/factories/pipeline_output_factory.py create mode 100644 aixplain/outputs/__init__.py create mode 100644 aixplain/outputs/pipelines/__init__.py create mode 100644 aixplain/outputs/pipelines/input_info.py create mode 100644 aixplain/outputs/pipelines/output_node.py create mode 100644 aixplain/outputs/pipelines/output_segment.py diff --git a/aixplain/factories/pipeline_output_factory.py b/aixplain/factories/pipeline_output_factory.py new file mode 100644 index 00000000..90982a93 --- /dev/null +++ b/aixplain/factories/pipeline_output_factory.py @@ -0,0 +1,176 @@ +__author__ = "thiagocastroferreira" + +""" +Copyright 2023 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Thiago Castro Ferreira +Date: September 27th 2023 +Description: + Pipeline Output Factory Class +""" + +from aixplain.enums.data_type import DataType +from aixplain.enums.language import Language +from aixplain.outputs.pipelines import PipelineOutput +from aixplain.outputs.pipelines.input_info import InputInfo +from aixplain.outputs.pipelines.output_node import OutputNode +from aixplain.outputs.pipelines.output_segment import OutputSegment +from typing import Dict, Text + + +def get_data_type(data_type_str: Text) -> DataType: + """Data Type Getter + + Args: + data_type_str (Text): data type value + + Raises: + Exception: data type value does not exist + + Returns: + DataType: data type enumerator + """ + try: + data_type = DataType(data_type_str) + except Exception: + raise Exception(f"Data type '{data_type_str}' does not exist.") + return data_type + + +def get_language(language_str: Text) -> Language: + """Language Getter + + Args: + language_str (Text): language value string + + Raises: + Exception: language value does not exist + + Returns: + Language: language enumerator + """ + try: + language = Language({"language": language_str, "dialect": ""}) + except Exception: + raise Exception(f"Language '{language_str}' does not exist.") + return language + + +def create_input_info(input_info_dict: Dict) -> InputInfo: + """InputInfo setter + + Args: + input_info_dict (Dict): input info dictionary + + Returns: + InputInfo: InputInfo object + """ + start = input_info_dict["start"] + end = input_info_dict["end"] + length = input_info_dict["length"] + is_url = input_info_dict["is_url"] + data_type = get_data_type(input_info_dict["type"]) + input_segment = input_info_dict["segment"] + language = None + if "language" in input_info_dict: + language = get_language(input_info_dict["language"]) + + return InputInfo( + start=start, end=end, length=length, is_url=is_url, language=language, data_type=data_type, input_segment=input_segment + ) + + +def create_output_segment(output_segment_dict: Dict) -> OutputSegment: + """OutputSegment Setter + + Args: + output_segment_dict (Dict): output segment dictionary + + Returns: + OutputSegment: OutputSegment instance + """ + index = output_segment_dict["index"] + status = "SUCCESS" if output_segment_dict["success"] is True else "FAILED" + response = output_segment_dict["response"] + is_url = output_segment_dict["is_url"] + data_type = get_data_type(output_segment_dict["output_type"]) + language = None + if "language" in output_segment_dict: + language = get_language(output_segment_dict["language"]) + details = output_segment_dict["details"] + supplier_response = None + if "rawData" in details: + supplier_response = details["rawData"] + del details["rawData"] + input_info = [create_input_info(info) for info in output_segment_dict["input_segment_info"]] + + return OutputSegment( + index=index, + status=status, + response=response, + is_url=is_url, + data_type=data_type, + details=details, + supplier_response=supplier_response, + language=language, + input_info=input_info, + ) + + +def create_output_node(output_node_dict: Dict) -> OutputNode: + """OutputNode Setter + + Args: + output_node_dict (Dict): output node dictionary + + Returns: + OutputNode: OutputNode instance + """ + node_id = output_node_dict["node_id"] + label = output_node_dict["label"] + path = output_node_dict["path"] + outputs = [create_output_segment(output_segment_dict) for output_segment_dict in output_node_dict["segments"]] + is_segmented = output_node_dict["is_segmented"] + + return OutputNode(node_id=node_id, label=label, path=path, outputs=outputs, is_segmented=is_segmented) + + +class PipelineOutputFactory: + """A static class for creating and exploring Pipeline Output Objects.""" + + @classmethod + def create_success_output(cls, response: Dict) -> PipelineOutput: + elapsed_time = response["elapsed_time"] + used_credits = response["used_credits"] + output_nodes = [create_output_node(output_node_dict) for output_node_dict in response["data"]] + + return PipelineOutput(status="SUCCESS", output_nodes=output_nodes, elapsed_time=elapsed_time, used_credits=used_credits) + + @classmethod + def create_in_progress_output(cls, progress: Text, elapsed_time: float = 0, used_credits: float = 0) -> PipelineOutput: + return PipelineOutput( + status="IN_PROGRESS", output_nodes=[], elapsed_time=elapsed_time, used_credits=used_credits, progress=progress + ) + + @classmethod + def create_fail_output(cls, error_message: Text, elapsed_time: float = 0, used_credits: float = 0) -> PipelineOutput: + return PipelineOutput( + status="FAILED", + output_nodes=[], + elapsed_time=elapsed_time, + used_credits=used_credits, + progress="0%", + error_message=error_message, + ) diff --git a/aixplain/modules/pipeline.py b/aixplain/modules/pipeline.py index 8749409a..5bd81459 100644 --- a/aixplain/modules/pipeline.py +++ b/aixplain/modules/pipeline.py @@ -24,7 +24,9 @@ import time import json import logging +from aixplain.factories.pipeline_output_factory import PipelineOutputFactory from aixplain.modules.asset import Asset +from aixplain.outputs.pipelines import PipelineOutput from aixplain.utils import config from aixplain.utils.file_utils import _request_with_retry from typing import Dict, Optional, Text, Union @@ -71,7 +73,7 @@ def __init__( def __polling( self, poll_url: Text, name: Text = "pipeline_process", wait_time: float = 1.0, timeout: float = 20000.0 - ) -> Dict: + ) -> PipelineOutput: """Keeps polling the platform to check whether an asynchronous call is done. Args: @@ -81,18 +83,19 @@ def __polling( timeout (float, optional): total polling time. Defaults to 20000.0. Returns: - dict: response obtained by polling call + PipelineOutput: PipelineOutput instance """ # TO DO: wait_time = to the longest path of the pipeline * minimum waiting time - logging.debug(f"Polling for Pipeline: Start polling for {name} ") + logging.debug(f"Polling for Pipeline: Start polling for {name}") start, end = time.time(), time.time() completed = False - response_body = {"status": "FAILED"} while not completed and (end - start) < timeout: try: - response_body = self.poll(poll_url, name=name) - logging.debug(f"Polling for Pipeline: Status of polling for {name} : {response_body}") - completed = response_body["completed"] + pipeline_output = self.poll(poll_url, name=name) + logging.debug(f"Polling for Pipeline: Status of polling for {name} : {pipeline_output.__dict__}") + + if pipeline_output.status in ["FAILED", "SUCCESS"]: + completed = True end = time.time() if completed is False: @@ -101,18 +104,18 @@ def __polling( wait_time *= 1.1 except Exception as e: logging.error(f"Polling for Pipeline: polling for {name} : Continue") - if response_body and response_body["status"] == "SUCCESS": - try: - logging.debug(f"Polling for Pipeline: Final status of polling for {name} : SUCCESS - {response_body}") - except Exception as e: - logging.error(f"Polling for Pipeline: Final status of polling for {name} : ERROR - {response_body}") + + if pipeline_output.status == "SUCCESS": + logging.debug(f"Polling for Pipeline: Final status of polling for {name} : SUCCESS") + elif pipeline_output.status == "FAILED": + logging.error(f"Polling for Pipeline: Final status of polling for {name} : ERROR") else: logging.error( f"Polling for Pipeline: Final status of polling for {name} : No response in {timeout} seconds - {response_body}" ) - return response_body + return pipeline_output - def poll(self, poll_url: Text, name: Text = "pipeline_process") -> Dict: + def poll(self, poll_url: Text, name: Text = "pipeline_process") -> PipelineOutput: """Poll the platform to check whether an asynchronous call is done. Args: @@ -120,17 +123,26 @@ def poll(self, poll_url: Text, name: Text = "pipeline_process") -> Dict: name (Text, optional): ID given to a call. Defaults to "pipeline_process". Returns: - Dict: response obtained by polling call + PipelineOutput: PipelineOutput instance """ headers = {"x-api-key": self.api_key, "Content-Type": "application/json"} r = _request_with_retry("get", poll_url, headers=headers) try: resp = r.json() + if resp["status"] == "IN_PROGRESS": + pipeline_output = PipelineOutputFactory.create_in_progress_output(progress=resp["progress"]) + elif resp["status"] == "SUCCESS": + pipeline_output = PipelineOutputFactory.create_success_output(response=resp) + else: + error_message = "" + if "error" in resp: + error_message = resp["error"] + pipeline_output = PipelineOutputFactory.create_fail_output(error_message=error_message) logging.info(f"Single Poll for Pipeline: Status of polling for {name} : {resp}") except Exception as e: - resp = {"status": "FAILED"} - return resp + pipeline_output = PipelineOutputFactory.create_fail_output(error_message=str(e)) + return pipeline_output def run( self, @@ -139,7 +151,7 @@ def run( name: Text = "pipeline_process", timeout: float = 20000.0, wait_time: float = 1.0, - ) -> Dict: + ) -> PipelineOutput: """Runs a pipeline call. Args: @@ -150,25 +162,29 @@ def run( wait_time (float, optional): wait time in seconds between polling calls. Defaults to 1.0. Returns: - Dict: parsed output from pipeline + PipelineOutput: PipelineOutput instance """ start = time.time() try: response = self.run_async(data, data_asset=data_asset, name=name) if response["status"] == "FAILED": end = time.time() - response["elapsed_time"] = end - start - return response + elapsed_time = end - start + error_message = "" + if "error" in response: + error_message = response["error"] + return PipelineOutputFactory.create_fail_output(error_message=error_message, elapsed_time=elapsed_time) poll_url = response["url"] end = time.time() - response = self.__polling(poll_url, name=name, timeout=timeout, wait_time=wait_time) - return response + pipeline_output = self.__polling(poll_url, name=name, timeout=timeout, wait_time=wait_time) + return pipeline_output except Exception as e: error_message = f"Error in request for {name}: {str(e)}" logging.error(error_message) logging.exception(error_message) end = time.time() - return {"status": "FAILED", "error": error_message, "elapsed_time": end - start} + elapsed_time = end - start + return PipelineOutputFactory.create_fail_output(error_message=error_message, elapsed_time=elapsed_time) def __prepare_payload(self, data: Union[Text, Dict], data_asset: Optional[Union[Text, Dict]] = None) -> Dict: """Prepare pipeline execution payload, validating the input data diff --git a/aixplain/outputs/__init__.py b/aixplain/outputs/__init__.py new file mode 100644 index 00000000..688a7f3f --- /dev/null +++ b/aixplain/outputs/__init__.py @@ -0,0 +1,20 @@ +""" +Copyright 2023 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Thiago Castro Ferreira +Date: September 27th 2023 +Description: + Output folder +""" diff --git a/aixplain/outputs/pipelines/__init__.py b/aixplain/outputs/pipelines/__init__.py new file mode 100644 index 00000000..f9847021 --- /dev/null +++ b/aixplain/outputs/pipelines/__init__.py @@ -0,0 +1,36 @@ +__author__ = "thiagocastroferreira" + +""" +Copyright 2023 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Thiago Castro Ferreira +Date: September 27th 2023 +Description: + Pipeline Output Class +""" + +from aixplain.outputs.pipelines.output_node import OutputNode +from dataclasses import dataclass +from typing import List, Optional, Text + + +@dataclass +class PipelineOutput: + status: Text + output_nodes: List[OutputNode] + elapsed_time: float + used_credits: float + progress: Text = "100%" + error_message: Optional[Text] = None diff --git a/aixplain/outputs/pipelines/input_info.py b/aixplain/outputs/pipelines/input_info.py new file mode 100644 index 00000000..3be0e8d8 --- /dev/null +++ b/aixplain/outputs/pipelines/input_info.py @@ -0,0 +1,38 @@ +__author__ = "thiagocastroferreira" + +""" +Copyright 2023 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Thiago Castro Ferreira +Date: September 27th 2023 +Description: + Input Segment Info Class +""" + +from aixplain.enums.data_type import DataType +from aixplain.enums.language import Language +from dataclasses import dataclass +from typing import Any, Optional, Union + + +@dataclass +class InputInfo: + start: Union[float, int] + end: Union[float, int] + length: Union[float, int] + is_url: bool + data_type: DataType + input_segment: Any + language: Optional[Language] = None diff --git a/aixplain/outputs/pipelines/output_node.py b/aixplain/outputs/pipelines/output_node.py new file mode 100644 index 00000000..8b3da945 --- /dev/null +++ b/aixplain/outputs/pipelines/output_node.py @@ -0,0 +1,35 @@ +__author__ = "thiagocastroferreira" + +""" +Copyright 2023 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Thiago Castro Ferreira +Date: September 27th 2023 +Description: + Output Node Class +""" + +from aixplain.outputs.pipelines.output_segment import OutputSegment +from dataclasses import dataclass +from typing import Dict, List, Text + + +@dataclass +class OutputNode: + node_id: int + label: Text + path: List[Dict] + is_segmented: bool + outputs: List[OutputSegment] diff --git a/aixplain/outputs/pipelines/output_segment.py b/aixplain/outputs/pipelines/output_segment.py new file mode 100644 index 00000000..7e82aa07 --- /dev/null +++ b/aixplain/outputs/pipelines/output_segment.py @@ -0,0 +1,41 @@ +__author__ = "thiagocastroferreira" + +""" +Copyright 2023 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Thiago Castro Ferreira +Date: September 27th 2023 +Description: + Output Segment Class +""" + +from aixplain.enums.data_type import DataType +from aixplain.enums.language import Language +from aixplain.outputs.pipelines.input_info import InputInfo +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Text + + +@dataclass +class OutputSegment: + index: int + status: Text + response: Any + is_url: bool + data_type: DataType + details: Dict = field(default_factory=dict) + supplier_response: Optional[Any] = None + language: Optional[Language] = None + input_info: Optional[InputInfo] = None