From 198faee2d691faa6dd1900bd5a3fbc336745cef3 Mon Sep 17 00:00:00 2001 From: Min RK Date: Tue, 29 Oct 2024 13:42:09 +0100 Subject: [PATCH 1/2] detach io thread output from creation cell --- ipyparallel/client/client.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/ipyparallel/client/client.py b/ipyparallel/client/client.py index 7675c32b..d25c3cee 100644 --- a/ipyparallel/client/client.py +++ b/ipyparallel/client/client.py @@ -7,6 +7,7 @@ import os import re import socket +import sys import time import types import warnings @@ -1078,6 +1079,16 @@ def _io_main(self, start_evt=None): """main loop for background IO thread""" self._io_loop = self._make_io_loop() self._setup_streams() + + # disable ipykernel's association of thread output with the cell that + # spawned the thread. + # there should be a public API for this... + thread_ident = current_thread().ident + for stream in [sys.stdout, sys.stderr]: + for name in ("_thread_to_parent", "_thread_to_parent_header"): + mapping = getattr(stream, name, None) + if mapping: + mapping.pop(thread_ident, None) # signal that start has finished # so that the main thread knows that all our attributes are defined if start_evt: From e534ef1cdf3b91ce23e156c672348a2045b5eefd Mon Sep 17 00:00:00 2001 From: Min RK Date: Wed, 30 Oct 2024 10:43:31 +0100 Subject: [PATCH 2/2] more systematic approach to thread output workaround use a Thread subclass that immediately undoes the output routing after it is applied --- ipyparallel/_async.py | 4 +++- ipyparallel/client/client.py | 14 ++------------ ipyparallel/cluster/launcher.py | 7 ++++--- ipyparallel/util.py | 34 ++++++++++++++++++++++++++++++++- 4 files changed, 42 insertions(+), 17 deletions(-) diff --git a/ipyparallel/_async.py b/ipyparallel/_async.py index bbf05dce..489f53ca 100644 --- a/ipyparallel/_async.py +++ b/ipyparallel/_async.py @@ -8,6 +8,8 @@ from tornado.ioloop import IOLoop +from ipyparallel.util import _OutputProducingThread as Thread + def _asyncio_run(coro): """Like asyncio.run, but works when there's no event loop""" @@ -41,7 +43,7 @@ def _in_thread(self, async_f, *args, **kwargs): """Run an async function in a background thread""" if self._async_thread is None: self._loop_started = threading.Event() - self._async_thread = threading.Thread(target=self._thread_main, daemon=True) + self._async_thread = Thread(target=self._thread_main, daemon=True) self._async_thread.start() self._loop_started.wait(timeout=5) diff --git a/ipyparallel/client/client.py b/ipyparallel/client/client.py index d25c3cee..893b5169 100644 --- a/ipyparallel/client/client.py +++ b/ipyparallel/client/client.py @@ -7,7 +7,6 @@ import os import re import socket -import sys import time import types import warnings @@ -16,7 +15,7 @@ from functools import partial from getpass import getpass from pprint import pprint -from threading import Event, Thread, current_thread +from threading import Event, current_thread import jupyter_client.session import zmq @@ -50,6 +49,7 @@ import ipyparallel as ipp from ipyparallel import error, serialize, util from ipyparallel.serialize import PrePickled, Reference +from ipyparallel.util import _OutputProducingThread as Thread from .asyncresult import AsyncHubResult, AsyncResult from .futures import MessageFuture, multi_future @@ -1079,16 +1079,6 @@ def _io_main(self, start_evt=None): """main loop for background IO thread""" self._io_loop = self._make_io_loop() self._setup_streams() - - # disable ipykernel's association of thread output with the cell that - # spawned the thread. - # there should be a public API for this... - thread_ident = current_thread().ident - for stream in [sys.stdout, sys.stderr]: - for name in ("_thread_to_parent", "_thread_to_parent_header"): - mapping = getattr(stream, name, None) - if mapping: - mapping.pop(thread_ident, None) # signal that start has finished # so that the main thread knows that all our attributes are defined if start_evt: diff --git a/ipyparallel/cluster/launcher.py b/ipyparallel/cluster/launcher.py index 23fa5ebe..325858e8 100644 --- a/ipyparallel/cluster/launcher.py +++ b/ipyparallel/cluster/launcher.py @@ -42,6 +42,7 @@ from traitlets.config.configurable import LoggingConfigurable from ..traitlets import entry_points +from ..util import _OutputProducingThread as Thread from ..util import shlex_join from ._winhpcjob import IPControllerJob, IPControllerTask, IPEngineSetJob, IPEngineTask from .shellcmd import ShellCommandSend @@ -524,7 +525,7 @@ def _start_waiting(self): # ensure self.loop is accessed on the main thread before waiting self.loop self._stop_waiting = threading.Event() - self._wait_thread = threading.Thread( + self._wait_thread = Thread( target=self._wait, daemon=True, name=f"wait(pid={self.pid})" ) self._wait_thread.start() @@ -583,7 +584,7 @@ def _stream_file(self, path): time.sleep(0.1) def _start_streaming(self): - self._stream_thread = t = threading.Thread( + self._stream_thread = t = Thread( target=partial(self._stream_file, self.output_file), name=f"Stream Output {self.identifier}", daemon=True, @@ -1352,7 +1353,7 @@ def _start_waiting(self): # ensure self.loop is accessed on the main thread before waiting self.loop self._stop_waiting = threading.Event() - self._wait_thread = threading.Thread( + self._wait_thread = Thread( target=self._wait, daemon=True, name=f"wait(host={self.location}, pid={self.pid})", diff --git a/ipyparallel/util.py b/ipyparallel/util.py index 2ecbfbd9..8e6fb259 100644 --- a/ipyparallel/util.py +++ b/ipyparallel/util.py @@ -13,8 +13,9 @@ import sys import warnings from datetime import datetime, timezone -from functools import lru_cache +from functools import lru_cache, partial from signal import SIGABRT, SIGINT, SIGTERM, signal +from threading import Thread, current_thread from types import FunctionType import traitlets @@ -804,3 +805,34 @@ def connect( socket.setsockopt(zmq.CURVE_SECRETKEY, curve_secretkey) socket.setsockopt(zmq.CURVE_PUBLICKEY, curve_publickey) return socket.connect(url) + + +def _detach_thread_output(ident=None): + """undo thread-parent mapping in ipykernel#1186""" + # disable ipykernel's association of thread output with the cell that + # spawned the thread. + # there should be a public API for this... + if ident is None: + ident = current_thread().ident + for stream in (sys.stdout, sys.stderr): + for name in ("_thread_to_parent", "_thread_to_parent_header"): + mapping = getattr(stream, name, None) + if mapping: + mapping.pop(ident, None) + + +class _OutputProducingThread(Thread): + """ + Subclass Thread to workaround bug in ipykernel + associating thread output with wrong Cell + + See https://github.com/ipython/ipykernel/issues/1289 + """ + + def __init__(self, target, **kwargs): + wrapped_target = partial(self._wrapped_target, target) + super().__init__(target=wrapped_target, **kwargs) + + def _wrapped_target(self, target, *args, **kwargs): + _detach_thread_output(self.ident) + return target(*args, **kwargs)