Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Objectify pipeline output #81

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions aixplain/factories/pipeline_output_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
__author__ = "thiagocastroferreira"

"""
Copyright 2023 The aiXplain SDK authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Author: Thiago Castro Ferreira
Date: September 27th 2023
Description:
Pipeline Output Factory Class
"""

from aixplain.enums.data_type import DataType
from aixplain.enums.language import Language
from aixplain.outputs.pipelines import PipelineOutput
from aixplain.outputs.pipelines.input_info import InputInfo
from aixplain.outputs.pipelines.output_node import OutputNode
from aixplain.outputs.pipelines.output_segment import OutputSegment
from typing import Dict, Text


def get_data_type(data_type_str: Text) -> DataType:
"""Data Type Getter

Args:
data_type_str (Text): data type value

Raises:
Exception: data type value does not exist

Returns:
DataType: data type enumerator
"""
try:
data_type = DataType(data_type_str)
except Exception:
raise Exception(f"Data type '{data_type_str}' does not exist.")
return data_type


def get_language(language_str: Text) -> Language:
"""Language Getter

Args:
language_str (Text): language value string

Raises:
Exception: language value does not exist

Returns:
Language: language enumerator
"""
try:
language = Language({"language": language_str, "dialect": ""})
except Exception:
raise Exception(f"Language '{language_str}' does not exist.")
return language


def create_input_info(input_info_dict: Dict) -> InputInfo:
"""InputInfo setter

Args:
input_info_dict (Dict): input info dictionary

Returns:
InputInfo: InputInfo object
"""
start = input_info_dict["start"]
end = input_info_dict["end"]
length = input_info_dict["length"]
is_url = input_info_dict["is_url"]
data_type = get_data_type(input_info_dict["type"])
input_segment = input_info_dict["segment"]
language = None
if "language" in input_info_dict:
language = get_language(input_info_dict["language"])

return InputInfo(
start=start, end=end, length=length, is_url=is_url, language=language, data_type=data_type, input_segment=input_segment
)


def create_output_segment(output_segment_dict: Dict) -> OutputSegment:
"""OutputSegment Setter

Args:
output_segment_dict (Dict): output segment dictionary

Returns:
OutputSegment: OutputSegment instance
"""
index = output_segment_dict["index"]
status = "SUCCESS" if output_segment_dict["success"] is True else "FAILED"
response = output_segment_dict["response"]
is_url = output_segment_dict["is_url"]
data_type = get_data_type(output_segment_dict["output_type"])
language = None
if "language" in output_segment_dict:
language = get_language(output_segment_dict["language"])
details = output_segment_dict["details"]
supplier_response = None
if "rawData" in details:
supplier_response = details["rawData"]
del details["rawData"]
input_info = [create_input_info(info) for info in output_segment_dict["input_segment_info"]]

return OutputSegment(
index=index,
status=status,
response=response,
is_url=is_url,
data_type=data_type,
details=details,
supplier_response=supplier_response,
language=language,
input_info=input_info,
)


def create_output_node(output_node_dict: Dict) -> OutputNode:
"""OutputNode Setter

Args:
output_node_dict (Dict): output node dictionary

Returns:
OutputNode: OutputNode instance
"""
node_id = output_node_dict["node_id"]
label = output_node_dict["label"]
path = output_node_dict["path"]
outputs = [create_output_segment(output_segment_dict) for output_segment_dict in output_node_dict["segments"]]
is_segmented = output_node_dict["is_segmented"]

return OutputNode(node_id=node_id, label=label, path=path, outputs=outputs, is_segmented=is_segmented)


class PipelineOutputFactory:
"""A static class for creating and exploring Pipeline Output Objects."""

@classmethod
def create_success_output(cls, response: Dict) -> PipelineOutput:
elapsed_time = response["elapsed_time"]
used_credits = response["used_credits"]
output_nodes = [create_output_node(output_node_dict) for output_node_dict in response["data"]]

return PipelineOutput(status="SUCCESS", output_nodes=output_nodes, elapsed_time=elapsed_time, used_credits=used_credits)

@classmethod
def create_in_progress_output(cls, progress: Text, elapsed_time: float = 0, used_credits: float = 0) -> PipelineOutput:
return PipelineOutput(
status="IN_PROGRESS", output_nodes=[], elapsed_time=elapsed_time, used_credits=used_credits, progress=progress
)

@classmethod
def create_fail_output(cls, error_message: Text, elapsed_time: float = 0, used_credits: float = 0) -> PipelineOutput:
return PipelineOutput(
status="FAILED",
output_nodes=[],
elapsed_time=elapsed_time,
used_credits=used_credits,
progress="0%",
error_message=error_message,
)
64 changes: 40 additions & 24 deletions aixplain/modules/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import time
import json
import logging
from aixplain.factories.pipeline_output_factory import PipelineOutputFactory
from aixplain.modules.asset import Asset
from aixplain.outputs.pipelines import PipelineOutput
from aixplain.utils import config
from aixplain.utils.file_utils import _request_with_retry
from typing import Dict, Optional, Text, Union
Expand Down Expand Up @@ -71,7 +73,7 @@ def __init__(

def __polling(
self, poll_url: Text, name: Text = "pipeline_process", wait_time: float = 1.0, timeout: float = 20000.0
) -> Dict:
) -> PipelineOutput:
"""Keeps polling the platform to check whether an asynchronous call is done.

Args:
Expand All @@ -81,18 +83,19 @@ def __polling(
timeout (float, optional): total polling time. Defaults to 20000.0.

Returns:
dict: response obtained by polling call
PipelineOutput: PipelineOutput instance
"""
# TO DO: wait_time = to the longest path of the pipeline * minimum waiting time
logging.debug(f"Polling for Pipeline: Start polling for {name} ")
logging.debug(f"Polling for Pipeline: Start polling for {name}")
start, end = time.time(), time.time()
completed = False
response_body = {"status": "FAILED"}
while not completed and (end - start) < timeout:
try:
response_body = self.poll(poll_url, name=name)
logging.debug(f"Polling for Pipeline: Status of polling for {name} : {response_body}")
completed = response_body["completed"]
pipeline_output = self.poll(poll_url, name=name)
logging.debug(f"Polling for Pipeline: Status of polling for {name} : {pipeline_output.__dict__}")

if pipeline_output.status in ["FAILED", "SUCCESS"]:
completed = True

end = time.time()
if completed is False:
Expand All @@ -101,36 +104,45 @@ def __polling(
wait_time *= 1.1
except Exception as e:
logging.error(f"Polling for Pipeline: polling for {name} : Continue")
if response_body and response_body["status"] == "SUCCESS":
try:
logging.debug(f"Polling for Pipeline: Final status of polling for {name} : SUCCESS - {response_body}")
except Exception as e:
logging.error(f"Polling for Pipeline: Final status of polling for {name} : ERROR - {response_body}")

if pipeline_output.status == "SUCCESS":
logging.debug(f"Polling for Pipeline: Final status of polling for {name} : SUCCESS")
elif pipeline_output.status == "FAILED":
logging.error(f"Polling for Pipeline: Final status of polling for {name} : ERROR")
else:
logging.error(
f"Polling for Pipeline: Final status of polling for {name} : No response in {timeout} seconds - {response_body}"
)
return response_body
return pipeline_output

def poll(self, poll_url: Text, name: Text = "pipeline_process") -> Dict:
def poll(self, poll_url: Text, name: Text = "pipeline_process") -> PipelineOutput:
"""Poll the platform to check whether an asynchronous call is done.

Args:
poll_url (Text): polling URL
name (Text, optional): ID given to a call. Defaults to "pipeline_process".

Returns:
Dict: response obtained by polling call
PipelineOutput: PipelineOutput instance
"""

headers = {"x-api-key": self.api_key, "Content-Type": "application/json"}
r = _request_with_retry("get", poll_url, headers=headers)
try:
resp = r.json()
if resp["status"] == "IN_PROGRESS":
pipeline_output = PipelineOutputFactory.create_in_progress_output(progress=resp["progress"])
elif resp["status"] == "SUCCESS":
pipeline_output = PipelineOutputFactory.create_success_output(response=resp)
else:
error_message = ""
if "error" in resp:
error_message = resp["error"]
pipeline_output = PipelineOutputFactory.create_fail_output(error_message=error_message)
logging.info(f"Single Poll for Pipeline: Status of polling for {name} : {resp}")
except Exception as e:
resp = {"status": "FAILED"}
return resp
pipeline_output = PipelineOutputFactory.create_fail_output(error_message=str(e))
return pipeline_output

def run(
self,
Expand All @@ -139,7 +151,7 @@ def run(
name: Text = "pipeline_process",
timeout: float = 20000.0,
wait_time: float = 1.0,
) -> Dict:
) -> PipelineOutput:
"""Runs a pipeline call.

Args:
Expand All @@ -150,25 +162,29 @@ def run(
wait_time (float, optional): wait time in seconds between polling calls. Defaults to 1.0.

Returns:
Dict: parsed output from pipeline
PipelineOutput: PipelineOutput instance
"""
start = time.time()
try:
response = self.run_async(data, data_asset=data_asset, name=name)
if response["status"] == "FAILED":
end = time.time()
response["elapsed_time"] = end - start
return response
elapsed_time = end - start
error_message = ""
if "error" in response:
error_message = response["error"]
return PipelineOutputFactory.create_fail_output(error_message=error_message, elapsed_time=elapsed_time)
poll_url = response["url"]
end = time.time()
response = self.__polling(poll_url, name=name, timeout=timeout, wait_time=wait_time)
return response
pipeline_output = self.__polling(poll_url, name=name, timeout=timeout, wait_time=wait_time)
return pipeline_output
except Exception as e:
error_message = f"Error in request for {name}: {str(e)}"
logging.error(error_message)
logging.exception(error_message)
end = time.time()
return {"status": "FAILED", "error": error_message, "elapsed_time": end - start}
elapsed_time = end - start
return PipelineOutputFactory.create_fail_output(error_message=error_message, elapsed_time=elapsed_time)

def __prepare_payload(self, data: Union[Text, Dict], data_asset: Optional[Union[Text, Dict]] = None) -> Dict:
"""Prepare pipeline execution payload, validating the input data
Expand Down
20 changes: 20 additions & 0 deletions aixplain/outputs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""
Copyright 2023 The aiXplain SDK authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Author: Thiago Castro Ferreira
Date: September 27th 2023
Description:
Output folder
"""
36 changes: 36 additions & 0 deletions aixplain/outputs/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
__author__ = "thiagocastroferreira"

"""
Copyright 2023 The aiXplain SDK authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Author: Thiago Castro Ferreira
Date: September 27th 2023
Description:
Pipeline Output Class
"""

from aixplain.outputs.pipelines.output_node import OutputNode
from dataclasses import dataclass
from typing import List, Optional, Text


@dataclass
class PipelineOutput:
status: Text
output_nodes: List[OutputNode]
elapsed_time: float
used_credits: float
progress: Text = "100%"
error_message: Optional[Text] = None
Loading