From 26d47bc6d247a79872332e620ed989ad3230412b Mon Sep 17 00:00:00 2001 From: Dinis Cruz Date: Thu, 3 Oct 2024 14:34:50 +0100 Subject: [PATCH] added and fix comments to to invoke_in_new_event_loop --- osbot_utils/utils/Threads.py | 24 +++++++------- tests/unit/utils/test_Threads.py | 56 +++++++++++++++----------------- 2 files changed, 39 insertions(+), 41 deletions(-) diff --git a/osbot_utils/utils/Threads.py b/osbot_utils/utils/Threads.py index de1d2b27..b238e9ca 100644 --- a/osbot_utils/utils/Threads.py +++ b/osbot_utils/utils/Threads.py @@ -1,6 +1,7 @@ import asyncio import logging import typing +from concurrent.futures import ThreadPoolExecutor def invoke_async_function(target: typing.Coroutine): """Run an asynchronous coroutine in a new event loop.""" @@ -27,17 +28,18 @@ def invoke_async_function(target: typing.Coroutine): logger.level = level_original # restore the original log level -def invoke_in_new_event_loop(target: typing.Coroutine): - def run_in_new_loop(): - new_loop = asyncio.new_event_loop() - asyncio.set_event_loop(new_loop) +def invoke_in_new_event_loop(target: typing.Coroutine): # Runs a coroutine in a new event loop in a separate thread and returns the result + def run_in_new_loop(): # Function to run the coroutine in a new event loop + new_loop = asyncio.new_event_loop() # Create a new event loop + asyncio.set_event_loop(new_loop) # Set the new event loop as the current event loop try: - return new_loop.run_until_complete(target) + return new_loop.run_until_complete(target) # Run the coroutine in the new event loop finally: - new_loop.close() + new_loop.close() # Close the event loop to free resources - from concurrent.futures import ThreadPoolExecutor - with ThreadPoolExecutor() as pool: - future = pool.submit(run_in_new_loop) - result = future.result() - return result \ No newline at end of file + with ThreadPoolExecutor() as pool: # Create a thread pool executor + future = pool.submit(run_in_new_loop) # Submit the function to run in the thread pool + result = future.result() # Wait for the result of the future + return result # Return the result from the coroutine + +async_invoke_in_new_loop = invoke_in_new_event_loop \ No newline at end of file diff --git a/tests/unit/utils/test_Threads.py b/tests/unit/utils/test_Threads.py index 67389e4d..0ad8ef8b 100644 --- a/tests/unit/utils/test_Threads.py +++ b/tests/unit/utils/test_Threads.py @@ -1,14 +1,10 @@ import asyncio -import typing -from unittest import TestCase - import pytest - -from osbot_utils.utils.Dev import pprint - -from osbot_utils.base_classes.Type_Safe import Type_Safe -from osbot_utils.context_managers.async_invoke import async_invoke -from osbot_utils.utils.Threads import invoke_async_function, invoke_in_new_event_loop +import typing +from unittest import TestCase +from osbot_utils.base_classes.Type_Safe import Type_Safe +from osbot_utils.context_managers.async_invoke import async_invoke +from osbot_utils.utils.Threads import invoke_async_function, invoke_in_new_event_loop class An_Class(Type_Safe): @@ -43,44 +39,44 @@ def test__with_multiple_event_loops(self): assert _(self.an_class.an_async_method()) == 49 assert _(self.an_class.an_async_method()) == 50 - def test_invoke_in_new_event_loop(self): - async def an_method(): - return 42 - assert invoke_in_new_event_loop(an_method()) == 42 + def test_invoke_in_new_event_loop(self): # Test the invoke_in_new_event_loop function + async def an_method(): # Define an async method that returns 42 + return 42 # Return 42 + assert invoke_in_new_event_loop(an_method()) == 42 # Assert that the helper function returns 42 when running an_method - async def inception_level_1(value_1): - async def inception_level_3(value_3): - return value_3 - def inception_level_2(value_2): - return invoke_in_new_event_loop(inception_level_3(value_2+1)) - return inception_level_2(value_1+1) + async def inception_level_1(value_1): # Define an async function with nested functions + async def inception_level_3(value_3): # Define a nested async function that returns its input + return value_3 # Return value_3 + def inception_level_2(value_2): # Define a nested sync function that runs inception_level_3 + return invoke_in_new_event_loop(inception_level_3(value_2+1)) # Run inception_level_3 in a new event loop and return the result + return inception_level_2(value_1+1) # Call inception_level_2 with value_1 + 1 + assert invoke_in_new_event_loop(inception_level_1(1)) == 3 # Assert that the helper function works with nested async calls - assert invoke_in_new_event_loop(inception_level_1(1)) == 3 - def test_invoke_in_new_event_loop__exception(self): # Test handling of exceptions within the coroutine + def test_invoke_in_new_event_loop__exception(self): # Test handling of exceptions within the coroutine async def failing_coro(): raise ValueError("Test exception") with pytest.raises(ValueError, match="Test exception") as context: invoke_in_new_event_loop(failing_coro()) - assert context.value.args[0] == "Test exception" # Assert exception raised + assert context.value.args[0] == "Test exception" # Assert exception raised - def test_invoke_in_new_event_loop__non_coroutine(self): # Test handling of non-coroutine inputs + def test_invoke_in_new_event_loop__non_coroutine(self): # Test handling of non-coroutine inputs with pytest.raises(TypeError) as context: invoke_in_new_event_loop(42) assert context.value.args[0] == "An asyncio.Future, a coroutine or an awaitable is required" - def test_invoke_in_new_event_loop__asyncio_sleep(self): # Test with an async I/O operation + def test_invoke_in_new_event_loop__asyncio_sleep(self): # Test with an async I/O operation async def sleep_coro(): await asyncio.sleep(0.1) return 'slept' - result = invoke_in_new_event_loop(sleep_coro()) # Assert that the coroutine returns the expected result after sleep + result = invoke_in_new_event_loop(sleep_coro()) # Assert that the coroutine returns the expected result after sleep assert result == 'slept' - def test_invoke_in_new_event_loop__concurrent(self): # Test concurrent invocations of the helper function + def test_invoke_in_new_event_loop__concurrent(self): # Test concurrent invocations of the helper function async def coro(n): await asyncio.sleep(0.1) return n @@ -89,9 +85,9 @@ async def coro(n): with ThreadPoolExecutor() as executor: futures = [executor.submit(invoke_in_new_event_loop, coro(i)) for i in range(5)] results = [future.result() for future in futures] - assert results == [0, 1, 2, 3, 4] # Assert that all concurrent invocations return correct results + assert results == [0, 1, 2, 3, 4] # Assert that all concurrent invocations return correct results - def test_invoke_in_new_event_loop__nested_calls(self): # Test nested calls to the helper function + def test_invoke_in_new_event_loop__nested_calls(self): # Test nested calls to the helper function async def coro(n): if n <= 0: return n @@ -99,10 +95,10 @@ async def coro(n): return invoke_in_new_event_loop(coro(n - 1)) + 1 result = invoke_in_new_event_loop(coro(5)) - assert result == 5 # Assert that nested invocations return the correct cumulative result + assert result == 5 # Assert that nested invocations return the correct cumulative result - def test_invoke_in_new_event_loop__multiple_invocations(self): # Test multiple successive invocations + def test_invoke_in_new_event_loop__multiple_invocations(self): # Test multiple successive invocations async def coro(): return 42