Skip to content

Commit

Permalink
starting to add support for capturing artifacts
Browse files Browse the repository at this point in the history
  • Loading branch information
DinisCruz committed Oct 9, 2024
1 parent 85ae646 commit 62b6841
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
29 changes: 23 additions & 6 deletions osbot_utils/helpers/flows/Flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import typing

from osbot_prefect.server.Prefect__Artifacts import Prefect__Artifacts
from osbot_utils.base_classes.Type_Safe import Type_Safe
from osbot_utils.helpers.CFormat import CFormat, f_dark_grey, f_magenta, f_bold
from osbot_utils.helpers.flows.models.Flow__Config import Flow__Config
Expand Down Expand Up @@ -34,7 +35,20 @@ class Flow(Type_Safe):
cformat : CFormat
executed_tasks : typing.List

def add_flow_artifact(self, description=None, key=None, data=None, artifact_type=None): # todo: figure out how to make this work since at the moment most are showing an unknown type in the prefect UI
result_data = dict(flow_run_id = self.flow_id,
description = description or 'description',
key = key or 'an-artifact-key',
data = data or {"link": "https://www.google.com", "link_text": "link to Google"}, # test data to see if it worksw
type = artifact_type or Prefect__Artifacts.LINK ) # type clashed with built-in type

flow_events.on__new_artifact(self, result_data )

def add_flow_result(self, key, description):
result_data = dict(flow_run_id = self.flow_id,
key = key ,
description = description )
flow_events.on__new_result(self, result_data )

def config_logger(self):
with self.logger as _:
Expand Down Expand Up @@ -70,15 +84,11 @@ def execute_flow(self):
if self.flow_config.log_to_memory:
self.captured_exec_logs = self.log_messages_with_colors()
self.logger.remove_memory_logger() # todo: move to method that does post-execute tasks
if self.flow_return_value:
self.add_flow_result(key = 'flow-return-value', description=f'{self.flow_return_value}')
flow_events.on__flow__stop(self)
return self

def f__flow_id(self):
return self.cformat.green(self.flow_id)

def f__flow_name(self):
return self.cformat.blue(self.flow_name)

def captured_logs(self):
return ansis_to_texts(self.captured_exec_logs)

Expand All @@ -93,6 +103,13 @@ def invoke_flow_target(self):
else:
self.flow_return_value = self.flow_target(*self.flow_args, **self.flow_kwargs) # if the flow is sync, just execute the flow target

def f__flow_id(self):
return self.cformat.green(self.flow_id)

def f__flow_name(self):
return self.cformat.blue(self.flow_name)


def log_captured_stdout(self, stdout):
for line in stdout.value().splitlines():
if line:
Expand Down
8 changes: 8 additions & 0 deletions osbot_utils/helpers/flows/Flow__Events.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ def on__flow_run__message(self, flow, log_level, flow_run_id, task_run_id, messa
flow_event = Flow__Event(event_type=Flow__Event_Type.FLOW_MESSAGE, event_source=flow, event_data=event_data)
self.raise_event(flow_event)

def on__new_artifact(self, flow, result_data):
flow_event = Flow__Event(event_type=Flow__Event_Type.NEW_ARTIFACT , event_source=flow, event_data=result_data)
self.raise_event(flow_event)

def on__new_result(self, flow, result_data):
flow_event = Flow__Event(event_type=Flow__Event_Type.NEW_RESULT , event_source=flow, event_data=result_data)
self.raise_event(flow_event)

def on__task__start(self, task):
flow_event = Flow__Event(event_type=Flow__Event_Type.TASK_START, event_source=task)
self.raise_event(flow_event)
Expand Down
2 changes: 2 additions & 0 deletions osbot_utils/helpers/flows/models/Flow__Event_Type.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ class Flow__Event_Type(Type_Safe):
FLOW_MESSAGE: str = 'flow_message'
FLOW_START : str = 'flow_start'
FLOW_STOP : str = 'flow_stop'
NEW_ARTIFACT: str = 'new_artifact'
NEW_RESULT : str = 'new_result'
TASK_START : str = 'task_start'
TASK_STOP : str = 'task_stop'

0 comments on commit 62b6841

Please sign in to comment.