Skip to content

Commit

Permalink
added and fix comments to to invoke_in_new_event_loop
Browse files Browse the repository at this point in the history
  • Loading branch information
DinisCruz committed Oct 3, 2024
1 parent b124671 commit 26d47bc
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 41 deletions.
24 changes: 13 additions & 11 deletions osbot_utils/utils/Threads.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import typing
from concurrent.futures import ThreadPoolExecutor

def invoke_async_function(target: typing.Coroutine):
"""Run an asynchronous coroutine in a new event loop."""
Expand All @@ -27,17 +28,18 @@ def invoke_async_function(target: typing.Coroutine):
logger.level = level_original # restore the original log level


def invoke_in_new_event_loop(target: typing.Coroutine):
def run_in_new_loop():
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
def invoke_in_new_event_loop(target: typing.Coroutine): # Runs a coroutine in a new event loop in a separate thread and returns the result
def run_in_new_loop(): # Function to run the coroutine in a new event loop
new_loop = asyncio.new_event_loop() # Create a new event loop
asyncio.set_event_loop(new_loop) # Set the new event loop as the current event loop
try:
return new_loop.run_until_complete(target)
return new_loop.run_until_complete(target) # Run the coroutine in the new event loop
finally:
new_loop.close()
new_loop.close() # Close the event loop to free resources

from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as pool:
future = pool.submit(run_in_new_loop)
result = future.result()
return result
with ThreadPoolExecutor() as pool: # Create a thread pool executor
future = pool.submit(run_in_new_loop) # Submit the function to run in the thread pool
result = future.result() # Wait for the result of the future
return result # Return the result from the coroutine

async_invoke_in_new_loop = invoke_in_new_event_loop
56 changes: 26 additions & 30 deletions tests/unit/utils/test_Threads.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import asyncio
import typing
from unittest import TestCase

import pytest

from osbot_utils.utils.Dev import pprint

from osbot_utils.base_classes.Type_Safe import Type_Safe
from osbot_utils.context_managers.async_invoke import async_invoke
from osbot_utils.utils.Threads import invoke_async_function, invoke_in_new_event_loop
import typing
from unittest import TestCase
from osbot_utils.base_classes.Type_Safe import Type_Safe
from osbot_utils.context_managers.async_invoke import async_invoke
from osbot_utils.utils.Threads import invoke_async_function, invoke_in_new_event_loop


class An_Class(Type_Safe):
Expand Down Expand Up @@ -43,44 +39,44 @@ def test__with_multiple_event_loops(self):
assert _(self.an_class.an_async_method()) == 49
assert _(self.an_class.an_async_method()) == 50

def test_invoke_in_new_event_loop(self):
async def an_method():
return 42
assert invoke_in_new_event_loop(an_method()) == 42
def test_invoke_in_new_event_loop(self): # Test the invoke_in_new_event_loop function
async def an_method(): # Define an async method that returns 42
return 42 # Return 42
assert invoke_in_new_event_loop(an_method()) == 42 # Assert that the helper function returns 42 when running an_method

async def inception_level_1(value_1):
async def inception_level_3(value_3):
return value_3
def inception_level_2(value_2):
return invoke_in_new_event_loop(inception_level_3(value_2+1))
return inception_level_2(value_1+1)
async def inception_level_1(value_1): # Define an async function with nested functions
async def inception_level_3(value_3): # Define a nested async function that returns its input
return value_3 # Return value_3
def inception_level_2(value_2): # Define a nested sync function that runs inception_level_3
return invoke_in_new_event_loop(inception_level_3(value_2+1)) # Run inception_level_3 in a new event loop and return the result
return inception_level_2(value_1+1) # Call inception_level_2 with value_1 + 1
assert invoke_in_new_event_loop(inception_level_1(1)) == 3 # Assert that the helper function works with nested async calls

assert invoke_in_new_event_loop(inception_level_1(1)) == 3

def test_invoke_in_new_event_loop__exception(self): # Test handling of exceptions within the coroutine
def test_invoke_in_new_event_loop__exception(self): # Test handling of exceptions within the coroutine
async def failing_coro():
raise ValueError("Test exception")

with pytest.raises(ValueError, match="Test exception") as context:
invoke_in_new_event_loop(failing_coro())
assert context.value.args[0] == "Test exception" # Assert exception raised
assert context.value.args[0] == "Test exception" # Assert exception raised


def test_invoke_in_new_event_loop__non_coroutine(self): # Test handling of non-coroutine inputs
def test_invoke_in_new_event_loop__non_coroutine(self): # Test handling of non-coroutine inputs
with pytest.raises(TypeError) as context:
invoke_in_new_event_loop(42)
assert context.value.args[0] == "An asyncio.Future, a coroutine or an awaitable is required"


def test_invoke_in_new_event_loop__asyncio_sleep(self): # Test with an async I/O operation
def test_invoke_in_new_event_loop__asyncio_sleep(self): # Test with an async I/O operation
async def sleep_coro():
await asyncio.sleep(0.1)
return 'slept'

result = invoke_in_new_event_loop(sleep_coro()) # Assert that the coroutine returns the expected result after sleep
result = invoke_in_new_event_loop(sleep_coro()) # Assert that the coroutine returns the expected result after sleep
assert result == 'slept'

def test_invoke_in_new_event_loop__concurrent(self): # Test concurrent invocations of the helper function
def test_invoke_in_new_event_loop__concurrent(self): # Test concurrent invocations of the helper function
async def coro(n):
await asyncio.sleep(0.1)
return n
Expand All @@ -89,20 +85,20 @@ async def coro(n):
with ThreadPoolExecutor() as executor:
futures = [executor.submit(invoke_in_new_event_loop, coro(i)) for i in range(5)]
results = [future.result() for future in futures]
assert results == [0, 1, 2, 3, 4] # Assert that all concurrent invocations return correct results
assert results == [0, 1, 2, 3, 4] # Assert that all concurrent invocations return correct results

def test_invoke_in_new_event_loop__nested_calls(self): # Test nested calls to the helper function
def test_invoke_in_new_event_loop__nested_calls(self): # Test nested calls to the helper function
async def coro(n):
if n <= 0:
return n
else:
return invoke_in_new_event_loop(coro(n - 1)) + 1

result = invoke_in_new_event_loop(coro(5))
assert result == 5 # Assert that nested invocations return the correct cumulative result
assert result == 5 # Assert that nested invocations return the correct cumulative result


def test_invoke_in_new_event_loop__multiple_invocations(self): # Test multiple successive invocations
def test_invoke_in_new_event_loop__multiple_invocations(self): # Test multiple successive invocations
async def coro():
return 42

Expand Down

0 comments on commit 26d47bc

Please sign in to comment.