Skip to content

Commit

Permalink
Merge dev into main
Browse files Browse the repository at this point in the history
  • Loading branch information
DinisCruz committed Oct 3, 2024
2 parents e6492b7 + dc14592 commit a495ee2
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 11 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Powerful Python util methods and classes that simplify common apis and tasks.

![Current Release](https://img.shields.io/badge/release-v1.46.0-blue)
![Current Release](https://img.shields.io/badge/release-v1.46.3-blue)
[![codecov](https://codecov.io/gh/owasp-sbot/OSBot-Utils/graph/badge.svg?token=GNVW0COX1N)](https://codecov.io/gh/owasp-sbot/OSBot-Utils)


Expand Down
20 changes: 19 additions & 1 deletion osbot_utils/utils/Threads.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import typing
from concurrent.futures import ThreadPoolExecutor

def invoke_async_function(target: typing.Coroutine):
"""Run an asynchronous coroutine in a new event loop."""
Expand All @@ -24,4 +25,21 @@ def invoke_async_function(target: typing.Coroutine):
else:
asyncio.set_event_loop(None)

logger.level = level_original # restore the original log level
logger.level = level_original # restore the original log level


def invoke_in_new_event_loop(target: typing.Coroutine): # Runs a coroutine in a new event loop in a separate thread and returns the result
def run_in_new_loop(): # Function to run the coroutine in a new event loop
new_loop = asyncio.new_event_loop() # Create a new event loop
asyncio.set_event_loop(new_loop) # Set the new event loop as the current event loop
try:
return new_loop.run_until_complete(target) # Run the coroutine in the new event loop
finally:
new_loop.close() # Close the event loop to free resources

with ThreadPoolExecutor() as pool: # Create a thread pool executor
future = pool.submit(run_in_new_loop) # Submit the function to run in the thread pool
result = future.result() # Wait for the result of the future
return result # Return the result from the coroutine

async_invoke_in_new_loop = invoke_in_new_event_loop
2 changes: 1 addition & 1 deletion osbot_utils/utils/Toml.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ def toml_to_dict(str_toml):
raise NotImplementedError("TOML parsing is not supported in Python versions earlier than 3.11")
return tomllib.loads(str_toml)


json_load_file = toml_dict_from_file
toml_file_load = toml_dict_from_file
2 changes: 1 addition & 1 deletion osbot_utils/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.46.0
v1.46.3
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "osbot_utils"
version = "v1.46.0"
version = "v1.46.3"
description = "OWASP Security Bot - Utils"
authors = ["Dinis Cruz <[email protected]>"]
license = "MIT"
Expand Down
80 changes: 74 additions & 6 deletions tests/unit/utils/test_Threads.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
import pytest
import typing
from unittest import TestCase

from osbot_utils.base_classes.Type_Safe import Type_Safe
from osbot_utils.context_managers.async_invoke import async_invoke
from osbot_utils.utils.Threads import invoke_async_function
from unittest import TestCase
from osbot_utils.base_classes.Type_Safe import Type_Safe
from osbot_utils.context_managers.async_invoke import async_invoke
from osbot_utils.utils.Threads import invoke_async_function, invoke_in_new_event_loop


class An_Class(Type_Safe):
Expand Down Expand Up @@ -37,4 +37,72 @@ def test__with_multiple_event_loops(self):
assert invoke_async_function(self.an_class.an_async_method()) == 47
assert __(self.an_class.an_async_method()) == 48
assert _(self.an_class.an_async_method()) == 49
assert _(self.an_class.an_async_method()) == 50
assert _(self.an_class.an_async_method()) == 50

def test_invoke_in_new_event_loop(self): # Test the invoke_in_new_event_loop function
async def an_method(): # Define an async method that returns 42
return 42 # Return 42
assert invoke_in_new_event_loop(an_method()) == 42 # Assert that the helper function returns 42 when running an_method

async def inception_level_1(value_1): # Define an async function with nested functions
async def inception_level_3(value_3): # Define a nested async function that returns its input
return value_3 # Return value_3
def inception_level_2(value_2): # Define a nested sync function that runs inception_level_3
return invoke_in_new_event_loop(inception_level_3(value_2+1)) # Run inception_level_3 in a new event loop and return the result
return inception_level_2(value_1+1) # Call inception_level_2 with value_1 + 1
assert invoke_in_new_event_loop(inception_level_1(1)) == 3 # Assert that the helper function works with nested async calls


def test_invoke_in_new_event_loop__exception(self): # Test handling of exceptions within the coroutine
async def failing_coro():
raise ValueError("Test exception")

with pytest.raises(ValueError, match="Test exception") as context:
invoke_in_new_event_loop(failing_coro())
assert context.value.args[0] == "Test exception" # Assert exception raised


def test_invoke_in_new_event_loop__non_coroutine(self): # Test handling of non-coroutine inputs
with pytest.raises(TypeError) as context:
invoke_in_new_event_loop(42)
assert context.value.args[0] == "An asyncio.Future, a coroutine or an awaitable is required"


def test_invoke_in_new_event_loop__asyncio_sleep(self): # Test with an async I/O operation
async def sleep_coro():
await asyncio.sleep(0.1)
return 'slept'

result = invoke_in_new_event_loop(sleep_coro()) # Assert that the coroutine returns the expected result after sleep
assert result == 'slept'

def test_invoke_in_new_event_loop__concurrent(self): # Test concurrent invocations of the helper function
async def coro(n):
await asyncio.sleep(0.1)
return n

from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
futures = [executor.submit(invoke_in_new_event_loop, coro(i)) for i in range(5)]
results = [future.result() for future in futures]
assert results == [0, 1, 2, 3, 4] # Assert that all concurrent invocations return correct results

def test_invoke_in_new_event_loop__nested_calls(self): # Test nested calls to the helper function
async def coro(n):
if n <= 0:
return n
else:
return invoke_in_new_event_loop(coro(n - 1)) + 1

result = invoke_in_new_event_loop(coro(5))
assert result == 5 # Assert that nested invocations return the correct cumulative result


def test_invoke_in_new_event_loop__multiple_invocations(self): # Test multiple successive invocations
async def coro():
return 42

for _ in range(10):
assert invoke_in_new_event_loop(coro()) == 42


0 comments on commit a495ee2

Please sign in to comment.