Skip to content

Commit

Permalink
move resetting events to setup() instead of _read_events()
Browse files Browse the repository at this point in the history
previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there

Signed-off-by: technillogue <[email protected]>
  • Loading branch information
technillogue committed Feb 2, 2024
1 parent ffaf2ce commit 7559564
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions python/cog/server/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ def setup(self) -> AsyncIterator[_PublicEventType]:
self._state = WorkerState.STARTING

async def inner() -> AsyncIterator[_PublicEventType]:
# in 3.10 Event started doing get_running_loop
# previously it stored the loop when created, which causes an error in tests
if sys.version_info < (3, 10):
self._terminating = self._mux.terminating = asyncio.Event()

self._child.start()
self._ensure_event_reader()
async for event in self._mux.read("SETUP", poll=0.1):
Expand Down Expand Up @@ -228,12 +233,6 @@ def handle_error(task: "asyncio.Task[None]") -> None:
self._read_events_task.add_done_callback(handle_error)

async def _read_events(self) -> None:
# in 3.10 Event started doing get_running_loop
# previously it stored the loop when created, which causes an error in tests
if sys.version_info < (3, 10):
self._terminating = asyncio.Event()
self._mux.terminating = self._terminating

while self._child.is_alive() and not self._terminating.is_set():
# this can still be running when the task is destroyed
result = await self._events.coro_recv_with_exit(self._terminating)
Expand Down

0 comments on commit 7559564

Please sign in to comment.