From f86971ab60a69a4bef5b65a476c7e348b6d41483 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Mon, 25 Jul 2022 16:45:05 -0300 Subject: [PATCH 01/17] Kill dangling subprocesses Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 42 ++++++++++++++++++++++++++ launch/package.xml | 1 + 2 files changed, 43 insertions(+) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index 553d31120..3f3f9cb97 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -31,6 +31,8 @@ from typing import Tuple # noqa: F401 from typing import Union +import psutil + import launch.logging from osrf_pycommon.process_utils import async_execute_process @@ -87,6 +89,8 @@ def __init__( 'sigterm_timeout', default=5), sigkill_timeout: SomeSubstitutionsType = LaunchConfiguration( 'sigkill_timeout', default=5), + sigkill_subprocesses_timeout: SomeSubstitutionsType = LaunchConfiguration( + 'sigkill_subprocesses_timeout', default=5), emulate_tty: bool = False, output: SomeSubstitutionsType = 'log', output_format: Text = '[{this.process_description.final_name}] {line}', @@ -158,6 +162,11 @@ def __init__( as a string or a list of strings and Substitutions to be resolved at runtime, defaults to the LaunchConfiguration called 'sigkill_timeout' + :param: sigkill_subprocesses_timeout time until sending SIGKILL directly to dangling + subprocesses after sending SIGKILL to the process, + as a string or a list of strings and Substitutions to be resolved + at runtime, defaults to the LaunchConfiguration called + 'sigkill_subprocesses_timeout' :param: emulate_tty emulate a tty (terminal), defaults to False, but can be overridden with the LaunchConfiguration called 'emulate_tty', the value of which is evaluated as true or false according to @@ -188,6 +197,8 @@ def __init__( self.__shell = shell self.__sigterm_timeout = normalize_to_list_of_substitutions(sigterm_timeout) self.__sigkill_timeout = normalize_to_list_of_substitutions(sigkill_timeout) + self.__sigkill_subprocesses_timeout = normalize_to_list_of_substitutions( + sigkill_subprocesses_timeout) self.__emulate_tty = emulate_tty self.__output = os.environ.get('OVERRIDE_LAUNCH_PROCESS_OUTPUT', output) if not isinstance(self.__output, dict): @@ -450,6 +461,11 @@ def printer(context, msg, timeout_substitutions): sigkill_timeout = [PythonExpression( ('float(', *self.__sigterm_timeout, ') + float(', *self.__sigkill_timeout, ')') )] + sigkill_subprocesses_timeout = [PythonExpression( + ( + 'float(', *self.__sigterm_timeout, ') + float(', *self.__sigkill_timeout, + ') + float(', *self.__sigkill_subprocesses_timeout, ')') + )] # Setup a timer to send us a SIGTERM if we don't shutdown quickly. self.__sigterm_timer = TimerAction( period=sigterm_timeout, @@ -480,9 +496,35 @@ def printer(context, msg, timeout_substitutions): ], cancel_on_shutdown=False, ) + def kill_subprocesses( + context, + timeout_substitutions, + children=psutil.Process( + self._subprocess_transport.get_pid()).children(recursive=True) + ): + process_name = context.locals.process_name + for p in children: + try: + p.send_signal(signal.SIGKILL) + except psutil.NoSuchProcess: + continue + self.__logger.warn( + f'subprocess[pid={p.pid}] of process[{process_name}] was not terminated after ' + f"'{perform_substitutions(context, timeout_substitutions)}' seconds of parent " + f"being killed. " + 'Sending SIGKILL to subprocess directly.' + ) + self.__sigkill_subprocesses_timer = TimerAction( + period=sigkill_subprocesses_timeout, + actions=[OpaqueFunction( + function=kill_subprocesses, + args=(sigkill_subprocesses_timeout, ))], + cancel_on_shutdown=False, + ) return [ cast(Action, self.__sigterm_timer), cast(Action, self.__sigkill_timer), + cast(Action, self.__sigkill_subprocesses_timer), ] def __get_sigint_event(self): diff --git a/launch/package.xml b/launch/package.xml index b46221a99..1182cb5fe 100644 --- a/launch/package.xml +++ b/launch/package.xml @@ -20,6 +20,7 @@ ament_index_python python3-importlib-metadata python3-lark-parser + python3-psutil python3-yaml ament_copyright From d26860089be0144d44b9cec29b4d631e5a08770e Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Wed, 28 Sep 2022 16:22:36 -0300 Subject: [PATCH 02/17] Address peer review comments Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 133 +++++++++++++++---------- 1 file changed, 79 insertions(+), 54 deletions(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index 3f3f9cb97..8ca5c4ad9 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -89,8 +89,8 @@ def __init__( 'sigterm_timeout', default=5), sigkill_timeout: SomeSubstitutionsType = LaunchConfiguration( 'sigkill_timeout', default=5), - sigkill_subprocesses_timeout: SomeSubstitutionsType = LaunchConfiguration( - 'sigkill_subprocesses_timeout', default=5), + signal_lingering_subprocesses: SomeSubstitutionsType = LaunchConfiguration( + 'signal_lingering_subprocesses', default=True), emulate_tty: bool = False, output: SomeSubstitutionsType = 'log', output_format: Text = '[{this.process_description.final_name}] {line}', @@ -162,11 +162,13 @@ def __init__( as a string or a list of strings and Substitutions to be resolved at runtime, defaults to the LaunchConfiguration called 'sigkill_timeout' - :param: sigkill_subprocesses_timeout time until sending SIGKILL directly to dangling - subprocesses after sending SIGKILL to the process, + :param: signal_subprocesses_timeout time until subprocesses start to be signaled directly, as a string or a list of strings and Substitutions to be resolved at runtime, defaults to the LaunchConfiguration called - 'sigkill_subprocesses_timeout' + 'signal_subprocesses_timeout'. + The timer will start to count after the process being executed finishes. + Subprocesses will be killed using the same SIGINT/SIGTERM/SIGKILL sequence + used to kill the executed process. :param: emulate_tty emulate a tty (terminal), defaults to False, but can be overridden with the LaunchConfiguration called 'emulate_tty', the value of which is evaluated as true or false according to @@ -197,8 +199,8 @@ def __init__( self.__shell = shell self.__sigterm_timeout = normalize_to_list_of_substitutions(sigterm_timeout) self.__sigkill_timeout = normalize_to_list_of_substitutions(sigkill_timeout) - self.__sigkill_subprocesses_timeout = normalize_to_list_of_substitutions( - sigkill_subprocesses_timeout) + self.__signal_lingering_subprocesses = normalize_to_list_of_substitutions( + signal_lingering_subprocesses) self.__emulate_tty = emulate_tty self.__output = os.environ.get('OVERRIDE_LAUNCH_PROCESS_OUTPUT', output) if not isinstance(self.__output, dict): @@ -218,6 +220,7 @@ def __init__( self.__shutdown_future = None # type: Optional[asyncio.Future] self.__sigterm_timer = None # type: Optional[TimerAction] self.__sigkill_timer = None # type: Optional[TimerAction] + self.__children: List[psutil.Process] = [] self.__stdout_buffer = io.StringIO() self.__stderr_buffer = io.StringIO() @@ -290,8 +293,12 @@ def _shutdown_process(self, context, *, send_sigint): self.__shutdown_future.set_result(None) # Otherwise process is still running, start the shutdown procedures. - context.extend_locals({'process_name': self.process_details['name']}) - actions_to_return = self.__get_shutdown_timer_actions() + context.extend_locals( + { + 'process_name': self.process_details['name'], + 'process_pid': self.process_details['pid'], + }) + actions_to_return = self.__get_shutdown_timer_actions(context) if send_sigint: actions_to_return.append(self.__get_sigint_event()) return actions_to_return @@ -367,7 +374,7 @@ def __on_process_output( if buffer.closed: # buffer was probably closed by __flush_buffers on shutdown. Output without # buffering. - buffer.info( + logger.info( self.__output_format.format(line=to_write, this=self) ) else: @@ -447,32 +454,21 @@ def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeAc send_sigint=not due_to_sigint or context.noninteractive, ) - def __get_shutdown_timer_actions(self) -> List[Action]: + def __get_shutdown_timer_actions(self, context) -> List[Action]: base_msg = \ "process[{}] failed to terminate '{}' seconds after receiving '{}', escalating to '{}'" - def printer(context, msg, timeout_substitutions): - self.__logger.error(msg.format( - context.locals.process_name, - perform_substitutions(context, timeout_substitutions), - )) + def printer(context, msg): + self.__logger.error(msg.format(context.locals.process_name)) - sigterm_timeout = self.__sigterm_timeout - sigkill_timeout = [PythonExpression( - ('float(', *self.__sigterm_timeout, ') + float(', *self.__sigkill_timeout, ')') - )] - sigkill_subprocesses_timeout = [PythonExpression( - ( - 'float(', *self.__sigterm_timeout, ') + float(', *self.__sigkill_timeout, - ') + float(', *self.__sigkill_subprocesses_timeout, ')') - )] # Setup a timer to send us a SIGTERM if we don't shutdown quickly. + sigterm_timeout = self.__sigterm_timeout_value self.__sigterm_timer = TimerAction( period=sigterm_timeout, actions=[ OpaqueFunction( function=printer, - args=(base_msg.format('{}', '{}', 'SIGINT', 'SIGTERM'), sigterm_timeout) + args=(base_msg.format('{}', sigterm_timeout, 'SIGINT', 'SIGTERM'), ) ), EmitEvent(event=SignalProcess( signal_number=signal.SIGTERM, @@ -481,13 +477,14 @@ def printer(context, msg, timeout_substitutions): ], cancel_on_shutdown=False, ) + sigkill_timeout = self.__sigterm_timeout_value + self.__sigkill_timeout_value # Setup a timer to send us a SIGKILL if we don't shutdown after SIGTERM. self.__sigkill_timer = TimerAction( period=sigkill_timeout, actions=[ OpaqueFunction( function=printer, - args=(base_msg.format('{}', '{}', 'SIGTERM', 'SIGKILL'), sigkill_timeout) + args=(base_msg.format('{}', sigkill_timeout, 'SIGTERM', 'SIGKILL'), ) ), EmitEvent(event=SignalProcess( signal_number='SIGKILL', @@ -496,35 +493,16 @@ def printer(context, msg, timeout_substitutions): ], cancel_on_shutdown=False, ) - def kill_subprocesses( - context, - timeout_substitutions, - children=psutil.Process( - self._subprocess_transport.get_pid()).children(recursive=True) - ): - process_name = context.locals.process_name - for p in children: - try: - p.send_signal(signal.SIGKILL) - except psutil.NoSuchProcess: - continue - self.__logger.warn( - f'subprocess[pid={p.pid}] of process[{process_name}] was not terminated after ' - f"'{perform_substitutions(context, timeout_substitutions)}' seconds of parent " - f"being killed. " - 'Sending SIGKILL to subprocess directly.' - ) - self.__sigkill_subprocesses_timer = TimerAction( - period=sigkill_subprocesses_timeout, - actions=[OpaqueFunction( - function=kill_subprocesses, - args=(sigkill_subprocesses_timeout, ))], - cancel_on_shutdown=False, - ) + self.__children = psutil.Process( + self._subprocess_transport.get_pid()).children(recursive=True) + # process_name = context.locals.process_name + # process_pid = context.locals.process_pid + # log_prefix_format = f'subprocess[pid={{}}] of process[{process_name}, pid={process_pid}]: ' + + # context.asyncio_loop.create_task(_signal_subprocesses()) return [ cast(Action, self.__sigterm_timer), cast(Action, self.__sigkill_timer), - cast(Action, self.__sigkill_subprocesses_timer), ] def __get_sigint_event(self): @@ -571,6 +549,46 @@ def on_stdout_received(self, data: bytes) -> None: def on_stderr_received(self, data: bytes) -> None: self.__context.emit_event_sync(ProcessStderr(text=data, **self.__process_event_args)) + async def _signal_subprocesses(self, context): + to_signal = self.__children + signaled = [] + sig = signal.SIGINT + start_time = context.asyncio_loop.time() + sigterm_timeout = self.__sigterm_timeout_value + sigkill_timeout = self.__sigterm_timeout_value + self.__sigkill_timeout_value + process_pid = self.process_details['pid'] + process_name = self.process_details['name'] + log_prefix_format = ( + 'subprocess[pid={}] of process[' + f'{process_name}, pid={process_pid}]: ') + # signal_subprocesses_timeout_v = perform_substitutions( + # context, signal_subprocesses_timeout) + next_signals = iter(((signal.SIGTERM, sigterm_timeout), (signal.SIGKILL, sigkill_timeout))) + while True: + for p in to_signal: + try: + p.send_signal(sig) + except psutil.NoSuchProcess: + pass + log_prefix = log_prefix_format.format(p.pid) + self.__logger.info( + f'{log_prefix}sending {sig.name} to subprocess directly.' + ) + signaled.append(p) + try: + sig, timeout = next(next_signals) + except StopIteration: + return + while context.asyncio_loop.time() < start_time + timeout: + await asyncio.sleep(0.5) + for p in list(signaled): + log_prefix = log_prefix_format.format(p.pid) + if not p.is_running(): + self.__logger.info(f'{log_prefix} exited') + signaled.remove(p) + to_signal = signaled + signaled = [] + async def __execute_process(self, context: LaunchContext) -> None: process_event_args = self.__process_event_args if process_event_args is None: @@ -638,8 +656,11 @@ async def __execute_process(self, context: LaunchContext) -> None: timeout=self.__respawn_delay ) if not self.__shutdown_future.done(): - context.asyncio_loop.create_task(self.__execute_process(context)) + context.asyncio_loop.create_task( + self.__execute_process(context)) return + if self.__signal_lingering_subprocesses_value: + await self._signal_subprocesses(context) self.__cleanup() def prepare(self, context: LaunchContext): @@ -720,6 +741,10 @@ def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEnti ] for event_handler in event_handlers: context.register_event_handler(event_handler) + self.__sigterm_timeout_value = perform_typed_substitution(context, self.__sigterm_timeout, float) + self.__sigkill_timeout_value = perform_typed_substitution(context, self.__sigkill_timeout, float) + self.__signal_lingering_subprocesses_value = perform_typed_substitution( + context, self.__signal_lingering_subprocesses, bool) try: self.__completed_future = create_future(context.asyncio_loop) From b5a925f078b2a10b23174b590e878bd8e0f2a555 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 6 Oct 2022 11:53:38 -0300 Subject: [PATCH 03/17] Fix docstring Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index 8ca5c4ad9..1ca81d5f4 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -162,13 +162,11 @@ def __init__( as a string or a list of strings and Substitutions to be resolved at runtime, defaults to the LaunchConfiguration called 'sigkill_timeout' - :param: signal_subprocesses_timeout time until subprocesses start to be signaled directly, - as a string or a list of strings and Substitutions to be resolved - at runtime, defaults to the LaunchConfiguration called - 'signal_subprocesses_timeout'. - The timer will start to count after the process being executed finishes. - Subprocesses will be killed using the same SIGINT/SIGTERM/SIGKILL sequence - used to kill the executed process. + :param: signal_lingering_subprocesses if `True`, all subprocesses spawned by the process + will be signaled to make sure they finish. + The sequence of signals used is the same SIGINT/SIGTERM/SIGKILL sequence + used to kill the main process. + Subprocesses start being signaled when the main process completes. :param: emulate_tty emulate a tty (terminal), defaults to False, but can be overridden with the LaunchConfiguration called 'emulate_tty', the value of which is evaluated as true or false according to From e2b1a3c9b4aeb9cc5287193498cae73d4e503a71 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 6 Oct 2022 11:56:14 -0300 Subject: [PATCH 04/17] Remove leftover comments Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index 1ca81d5f4..fe9694ca2 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -493,11 +493,7 @@ def printer(context, msg): ) self.__children = psutil.Process( self._subprocess_transport.get_pid()).children(recursive=True) - # process_name = context.locals.process_name - # process_pid = context.locals.process_pid - # log_prefix_format = f'subprocess[pid={{}}] of process[{process_name}, pid={process_pid}]: ' - # context.asyncio_loop.create_task(_signal_subprocesses()) return [ cast(Action, self.__sigterm_timer), cast(Action, self.__sigkill_timer), From 8466b4979d7ccfad9bb98217871aaa8d3205dc9b Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 6 Oct 2022 12:06:56 -0300 Subject: [PATCH 05/17] Undo unneeded change Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index fe9694ca2..bcce41e3d 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -650,8 +650,7 @@ async def __execute_process(self, context: LaunchContext) -> None: timeout=self.__respawn_delay ) if not self.__shutdown_future.done(): - context.asyncio_loop.create_task( - self.__execute_process(context)) + context.asyncio_loop.create_task(self.__execute_process(context)) return if self.__signal_lingering_subprocesses_value: await self._signal_subprocesses(context) From 235622338bc52d0702ae63238930a921ac211195 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 6 Oct 2022 12:09:25 -0300 Subject: [PATCH 06/17] Cleanup subprocesses before respawning Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index bcce41e3d..a02d891d3 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -650,6 +650,8 @@ async def __execute_process(self, context: LaunchContext) -> None: timeout=self.__respawn_delay ) if not self.__shutdown_future.done(): + if self.__signal_lingering_subprocesses_value: + await self._signal_subprocesses(context) context.asyncio_loop.create_task(self.__execute_process(context)) return if self.__signal_lingering_subprocesses_value: From ac6afd34b67557ee612683f7d34eb6986272d31e Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 6 Oct 2022 15:27:01 -0300 Subject: [PATCH 07/17] please flake8 Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index a02d891d3..c1dc0a89e 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -31,13 +31,13 @@ from typing import Tuple # noqa: F401 from typing import Union -import psutil - import launch.logging from osrf_pycommon.process_utils import async_execute_process from osrf_pycommon.process_utils import AsyncSubprocessProtocol +import psutil + from .emit_event import EmitEvent from .opaque_function import OpaqueFunction from .timer_action import TimerAction @@ -66,9 +66,7 @@ from ..launch_description_entity import LaunchDescriptionEntity from ..some_actions_type import SomeActionsType from ..some_substitutions_type import SomeSubstitutionsType -from ..substitution import Substitution # noqa: F401 from ..substitutions import LaunchConfiguration -from ..substitutions import PythonExpression from ..utilities import create_future from ..utilities import is_a_subclass from ..utilities import normalize_to_list_of_substitutions @@ -736,8 +734,10 @@ def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEnti ] for event_handler in event_handlers: context.register_event_handler(event_handler) - self.__sigterm_timeout_value = perform_typed_substitution(context, self.__sigterm_timeout, float) - self.__sigkill_timeout_value = perform_typed_substitution(context, self.__sigkill_timeout, float) + self.__sigterm_timeout_value = perform_typed_substitution( + context, self.__sigterm_timeout, float) + self.__sigkill_timeout_value = perform_typed_substitution( + context, self.__sigkill_timeout, float) self.__signal_lingering_subprocesses_value = perform_typed_substitution( context, self.__signal_lingering_subprocesses, bool) From 7ffee9aef4bf0656557ce82813568c07e13a7e6e Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 6 Oct 2022 15:54:27 -0300 Subject: [PATCH 08/17] Fix issue detected in tests Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index c1dc0a89e..8e04a4da2 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -489,8 +489,11 @@ def printer(context, msg): ], cancel_on_shutdown=False, ) - self.__children = psutil.Process( - self._subprocess_transport.get_pid()).children(recursive=True) + try: + self.__children = psutil.Process( + self._subprocess_transport.get_pid()).children(recursive=True) + except psutil.NoSuchProcess: + self.__children = [] return [ cast(Action, self.__sigterm_timer), From 6432d76ae332082102bcc9bf3f4fb0536ee5dcdc Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 13 Oct 2022 09:26:51 -0300 Subject: [PATCH 09/17] Improve error handling Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index 8e04a4da2..064536be7 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -489,12 +489,13 @@ def printer(context, msg): ], cancel_on_shutdown=False, ) - try: - self.__children = psutil.Process( - self._subprocess_transport.get_pid()).children(recursive=True) - except psutil.NoSuchProcess: - self.__children = [] - + self.__children = [] + pid = self._subprocess_transport.get_pid() + if pid is not None: + try: + self.__children = psutil.Process(pid).children(recursive=True) + except psutil.NoSuchProcess: + pass return [ cast(Action, self.__sigterm_timer), cast(Action, self.__sigkill_timer), From 6284c82b601cefefda4f79bef3a002e0c0f00e10 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 1 Dec 2022 15:33:14 -0300 Subject: [PATCH 10/17] Address peer review comments Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index 064536be7..9c7691389 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -557,8 +557,6 @@ async def _signal_subprocesses(self, context): log_prefix_format = ( 'subprocess[pid={}] of process[' f'{process_name}, pid={process_pid}]: ') - # signal_subprocesses_timeout_v = perform_substitutions( - # context, signal_subprocesses_timeout) next_signals = iter(((signal.SIGTERM, sigterm_timeout), (signal.SIGKILL, sigkill_timeout))) while True: for p in to_signal: @@ -575,13 +573,15 @@ async def _signal_subprocesses(self, context): sig, timeout = next(next_signals) except StopIteration: return - while context.asyncio_loop.time() < start_time + timeout: - await asyncio.sleep(0.5) + current_time = context.asyncio_loop.time() + while current_time < start_time + timeout: + await asyncio.sleep(min(0.5, start_time + timeout - current_time)) for p in list(signaled): - log_prefix = log_prefix_format.format(p.pid) if not p.is_running(): - self.__logger.info(f'{log_prefix} exited') + log_prefix = log_prefix_format.format(p.pid) + self.__logger.info(f'{log_prefix}exited') signaled.remove(p) + current_time = context.asyncio_loop.time() to_signal = signaled signaled = [] From 514c149639eafe68013353862606d6311c7c974e Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 1 Dec 2022 15:48:55 -0300 Subject: [PATCH 11/17] Remove unnecessary argument in __get_shutdown_timer_actions() Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index 9c7691389..c0c31ca07 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -294,7 +294,7 @@ def _shutdown_process(self, context, *, send_sigint): 'process_name': self.process_details['name'], 'process_pid': self.process_details['pid'], }) - actions_to_return = self.__get_shutdown_timer_actions(context) + actions_to_return = self.__get_shutdown_timer_actions() if send_sigint: actions_to_return.append(self.__get_sigint_event()) return actions_to_return @@ -450,7 +450,7 @@ def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeAc send_sigint=not due_to_sigint or context.noninteractive, ) - def __get_shutdown_timer_actions(self, context) -> List[Action]: + def __get_shutdown_timer_actions(self)-> List[Action]: base_msg = \ "process[{}] failed to terminate '{}' seconds after receiving '{}', escalating to '{}'" From 5b43cdcc50108a60fbbb4b56d3eb9d88b91c52f7 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Mon, 5 Dec 2022 12:34:06 -0300 Subject: [PATCH 12/17] Add test case Signed-off-by: Ivan Santiago Paunovic --- launch/test/launch/test_execute_local.py | 41 ++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/launch/test/launch/test_execute_local.py b/launch/test/launch/test_execute_local.py index 24b5f26b6..a4bd6af0f 100644 --- a/launch/test/launch/test_execute_local.py +++ b/launch/test/launch/test_execute_local.py @@ -17,8 +17,12 @@ """Tests for the ExecuteLocal Action.""" +import asyncio import os +import psutil +import signal import sys +import time from launch import LaunchDescription from launch import LaunchService @@ -28,6 +32,8 @@ from launch.actions import TimerAction from launch.descriptions import Executable +import osrf_pycommon.process_utils + import pytest @@ -138,3 +144,38 @@ def test_execute_process_with_output_dictionary(): ls = LaunchService() ls.include_launch_description(ld) assert 0 == ls.run() + + +PYTHON_SCRIPT="""\ +import time + +while 1: + time.sleep(0.5) +""" + + +def test_kill_subprocesses(): + """Test launching a process with an environment variable.""" + executable = ExecuteLocal( + process_description=Executable( + cmd=['python3', '-c', f'"{PYTHON_SCRIPT}"'], + ), + shell=True, + output='screen', + ) + ld = LaunchDescription([executable]) + ls = LaunchService() + ls.include_launch_description(ld) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + run_async_task = loop.create_task(ls.run_async()) + async def wait_for_subprocesses(): + start = time.time() + while len(psutil.Process().children(recursive=True)) != 2: + await asyncio.sleep(0.5) + assert time.time() < start + 5., 'timed out waiting for processes to setup' + wait_for_subprocesses_task = loop.create_task(wait_for_subprocesses()) + loop.run_until_complete(wait_for_subprocesses_task) + os.kill(executable.process_details['pid'], signal.SIGTERM) + loop.run_until_complete(run_async_task) + assert len(psutil.Process().children(recursive=True)) == 0 From 4b796ccd2194a57ab00dc92a0134f1047c7bc4a0 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Wed, 14 Dec 2022 17:29:46 -0300 Subject: [PATCH 13/17] Fix flake8 failures Signed-off-by: Ivan Santiago Paunovic --- launch/test/launch/test_execute_local.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/launch/test/launch/test_execute_local.py b/launch/test/launch/test_execute_local.py index a4bd6af0f..7e0c59594 100644 --- a/launch/test/launch/test_execute_local.py +++ b/launch/test/launch/test_execute_local.py @@ -32,8 +32,6 @@ from launch.actions import TimerAction from launch.descriptions import Executable -import osrf_pycommon.process_utils - import pytest @@ -146,7 +144,7 @@ def test_execute_process_with_output_dictionary(): assert 0 == ls.run() -PYTHON_SCRIPT="""\ +PYTHON_SCRIPT = """\ import time while 1: @@ -169,6 +167,7 @@ def test_kill_subprocesses(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) run_async_task = loop.create_task(ls.run_async()) + async def wait_for_subprocesses(): start = time.time() while len(psutil.Process().children(recursive=True)) != 2: From 58cd6610687cbe4282402ff4b68ca971900ec458 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Wed, 14 Dec 2022 17:34:33 -0300 Subject: [PATCH 14/17] More flake8 failures Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 2 +- launch/test/launch/test_execute_local.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index c0c31ca07..14a73ff56 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -450,7 +450,7 @@ def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeAc send_sigint=not due_to_sigint or context.noninteractive, ) - def __get_shutdown_timer_actions(self)-> List[Action]: + def __get_shutdown_timer_actions(self) -> List[Action]: base_msg = \ "process[{}] failed to terminate '{}' seconds after receiving '{}', escalating to '{}'" diff --git a/launch/test/launch/test_execute_local.py b/launch/test/launch/test_execute_local.py index 7e0c59594..14dc1cd00 100644 --- a/launch/test/launch/test_execute_local.py +++ b/launch/test/launch/test_execute_local.py @@ -19,7 +19,6 @@ import asyncio import os -import psutil import signal import sys import time @@ -32,6 +31,8 @@ from launch.actions import TimerAction from launch.descriptions import Executable +import psutil + import pytest From 5ff2f05f983d0c866b485b35a1eb737163edc51e Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Tue, 20 Dec 2022 14:18:37 -0300 Subject: [PATCH 15/17] Seems to fix CI issues... Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index 14a73ff56..361be0454 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -656,9 +656,9 @@ async def __execute_process(self, context: LaunchContext) -> None: await self._signal_subprocesses(context) context.asyncio_loop.create_task(self.__execute_process(context)) return + self.__cleanup() if self.__signal_lingering_subprocesses_value: await self._signal_subprocesses(context) - self.__cleanup() def prepare(self, context: LaunchContext): """Prepare the action for execution.""" From b5ba458b76ab6ea89252e2f6c1865afcba9e1f86 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Tue, 20 Dec 2022 15:30:06 -0300 Subject: [PATCH 16/17] Second try Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index 361be0454..a2b741163 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -507,12 +507,15 @@ def __get_sigint_event(self): process_matcher=matches_action(self), )) - def __cleanup(self): - # Cancel any pending timers we started. + def __cleanup_timers(self): if self.__sigterm_timer is not None: self.__sigterm_timer.cancel() if self.__sigkill_timer is not None: self.__sigkill_timer.cancel() + + def __cleanup(self): + # Cancel any pending timers we started. + self.__cleanup_timers() # Close subprocess transport if any. if self._subprocess_transport is not None: self._subprocess_transport.close() @@ -656,9 +659,10 @@ async def __execute_process(self, context: LaunchContext) -> None: await self._signal_subprocesses(context) context.asyncio_loop.create_task(self.__execute_process(context)) return - self.__cleanup() + self.__cleanup_timers() if self.__signal_lingering_subprocesses_value: await self._signal_subprocesses(context) + self.__cleanup() def prepare(self, context: LaunchContext): """Prepare the action for execution.""" From fc97730dd93ced38a25be9768b4d5823f4d0f63e Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Tue, 20 Dec 2022 15:58:33 -0300 Subject: [PATCH 17/17] another try Signed-off-by: Ivan Santiago Paunovic --- launch/launch/actions/execute_local.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index a2b741163..a6f0bfd35 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -566,7 +566,7 @@ async def _signal_subprocesses(self, context): try: p.send_signal(sig) except psutil.NoSuchProcess: - pass + continue log_prefix = log_prefix_format.format(p.pid) self.__logger.info( f'{log_prefix}sending {sig.name} to subprocess directly.' @@ -584,6 +584,8 @@ async def _signal_subprocesses(self, context): log_prefix = log_prefix_format.format(p.pid) self.__logger.info(f'{log_prefix}exited') signaled.remove(p) + if not signaled: + return current_time = context.asyncio_loop.time() to_signal = signaled signaled = []