Skip to content

Commit

Permalink
added very important method to Flows async workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
DinisCruz committed Oct 4, 2024
1 parent 111e999 commit 0439b05
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions osbot_utils/helpers/flows/Flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,16 @@ def captured_logs(self):
def info(self, message):
self.logger.info(message)


async def invoke_flow_target__thread(self, flow): # this is a REALLY important method which is used to pin the flow object to the call stack
return await flow.flow_target(*flow.flow_args, **flow.flow_kwargs) # which is then used by the Task.find_flow method to find it

def invoke_flow_target(self):
if asyncio.iscoroutinefunction(self.flow_target):
#self.flow_return_value = invoke_async(self.flow_target(*self.flow_args, **self.flow_kwargs))
self.flow_return_value = invoke_in_new_event_loop(self.flow_target(*self.flow_args, **self.flow_kwargs))
async_coroutine = self.invoke_flow_target__thread(self) # use this special method to pin the flow object to the call stack
self.flow_return_value = invoke_in_new_event_loop(async_coroutine) # this will start a complete new thread to execute the flow (which is exactly what we want)
else:
self.flow_return_value = self.flow_target(*self.flow_args, **self.flow_kwargs)
self.flow_return_value = self.flow_target(*self.flow_args, **self.flow_kwargs) # if the flow is sync, just execute the flow target

def log_captured_stdout(self, stdout):
for line in stdout.value().splitlines():
Expand Down

0 comments on commit 0439b05

Please sign in to comment.