Skip to content

Commit

Permalink
Eloquent patch release 1 (#370)
Browse files Browse the repository at this point in the history
* Handle case where output buffer is closed during shutdown (#365)

* Handle case where output buffer is closed during shutdown

  - Prevent crash during launch shutdown when a process IO event happens
    after the buffers have been closed
  - Use unbuffered output in that case so IO still has a chance of being seen

Signed-off-by: Pete Baughman <[email protected]>

* Address MR feedback

Signed-off-by: Pete Baughman <[email protected]>
Signed-off-by: Ivan Santiago Paunovic <[email protected]>

* Import test file without contaminating sys.modules (#360)

Signed-off-by: Pete Baughman <[email protected]>
Signed-off-by: Ivan Santiago Paunovic <[email protected]>

* Release loop lock before waiting for it to do work (#369)

Main thread can be blocked trying to acquire
__loop_from_run_thread_lock while emit_event() in another thread is
holding that lock and waiting for the main thread to emit the event.
This change releases the lock before blocking.

Signed-off-by: Shane Loretz <[email protected]>
Signed-off-by: Ivan Santiago Paunovic <[email protected]>

Co-authored-by: Peter Baughman <[email protected]>
Co-authored-by: Shane Loretz <[email protected]>
  • Loading branch information
3 people authored Jan 21, 2020
1 parent 2f32b9f commit bd47290
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 35 deletions.
76 changes: 46 additions & 30 deletions launch/launch/actions/execute_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,40 +426,56 @@ def __on_process_stdin(
def __on_process_stdout(
self, event: ProcessIO
) -> Optional[SomeActionsType]:
self.__stdout_buffer.write(event.text.decode(errors='replace'))
self.__stdout_buffer.seek(0)
last_line = None
for line in self.__stdout_buffer:
if line.endswith(os.linesep):
self.__stdout_logger.info(
self.__output_format.format(line=line[:-len(os.linesep)], this=self)
)
else:
last_line = line
break
self.__stdout_buffer.seek(0)
self.__stdout_buffer.truncate(0)
if last_line is not None:
self.__stdout_buffer.write(last_line)
to_write = event.text.decode(errors='replace')
if self.__stdout_buffer.closed:
# __stdout_buffer was probably closed by __flush_buffers on shutdown. Output without
# buffering.
self.__stdout_logger.info(
self.__output_format.format(line=to_write, this=self)
)
else:
self.__stdout_buffer.write(to_write)
self.__stdout_buffer.seek(0)
last_line = None
for line in self.__stdout_buffer:
if line.endswith(os.linesep):
self.__stdout_logger.info(
self.__output_format.format(line=line[:-len(os.linesep)], this=self)
)
else:
last_line = line
break
self.__stdout_buffer.seek(0)
self.__stdout_buffer.truncate(0)
if last_line is not None:
self.__stdout_buffer.write(last_line)

def __on_process_stderr(
self, event: ProcessIO
) -> Optional[SomeActionsType]:
self.__stderr_buffer.write(event.text.decode(errors='replace'))
self.__stderr_buffer.seek(0)
last_line = None
for line in self.__stderr_buffer:
if line.endswith(os.linesep):
self.__stderr_logger.info(
self.__output_format.format(line=line[:-len(os.linesep)], this=self)
)
else:
last_line = line
break
self.__stderr_buffer.seek(0)
self.__stderr_buffer.truncate(0)
if last_line is not None:
self.__stderr_buffer.write(last_line)
to_write = event.text.decode(errors='replace')
if self.__stderr_buffer.closed:
# __stderr buffer was probably closed by __flush_buffers on shutdown. Output without
# buffering.
self.__stderr_logger.info(
self.__output_format.format(line=to_write, this=self)
)
else:
self.__stderr_buffer.write(to_write)
self.__stderr_buffer.seek(0)
last_line = None
for line in self.__stderr_buffer:
if line.endswith(os.linesep):
self.__stderr_logger.info(
self.__output_format.format(line=line[:-len(os.linesep)], this=self)
)
else:
last_line = line
break
self.__stderr_buffer.seek(0)
self.__stderr_buffer.truncate(0)
if last_line is not None:
self.__stderr_buffer.write(last_line)

def __flush_buffers(self, event, context):
with self.__stdout_buffer as buf:
Expand Down
6 changes: 5 additions & 1 deletion launch/launch/launch_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,22 @@ def emit_event(self, event: Event) -> None:
If the LaunchService is not running, the event is queued until it is.
"""
future = None
with self.__loop_from_run_thread_lock:
if self.__loop_from_run_thread is not None:
# loop is in use, asynchronously emit the event
future = asyncio.run_coroutine_threadsafe(
self.__context.emit_event(event),
self.__loop_from_run_thread
)
future.result()
else:
# loop is not in use, synchronously emit the event, and it will be processed later
self.__context.emit_event_sync(event)

if future is not None:
# Block until asynchronously emitted event is emitted by loop
future.result()

def include_launch_description(self, launch_description: LaunchDescription) -> None:
"""
Evaluate a given LaunchDescription and visits all of its entities.
Expand Down
9 changes: 5 additions & 4 deletions launch_testing/launch_testing/launch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import argparse
from importlib.machinery import SourceFileLoader
import importlib.util
import logging
import os
import sys
Expand All @@ -28,9 +28,10 @@

def _load_python_file_as_module(test_module_name, python_file_path):
"""Load a given Python launch file (by path) as a Python module."""
# Taken from launch_testing to not introduce a weird dependency thing
loader = SourceFileLoader(test_module_name, python_file_path)
return loader.load_module()
spec = importlib.util.spec_from_file_location(test_module_name, python_file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module


def add_arguments(parser):
Expand Down

0 comments on commit bd47290

Please sign in to comment.