diff --git a/ros2param/package.xml b/ros2param/package.xml
index c7bda056c..5413b07f2 100644
--- a/ros2param/package.xml
+++ b/ros2param/package.xml
@@ -20,6 +20,10 @@
ament_flake8
ament_pep257
ament_xmllint
+ launch
+ launch_ros
+ launch_testing
+ launch_testing_ros
python3-pytest
diff --git a/ros2param/test/fixtures/parameter_node.py b/ros2param/test/fixtures/parameter_node.py
new file mode 100644
index 000000000..fabe1dfc5
--- /dev/null
+++ b/ros2param/test/fixtures/parameter_node.py
@@ -0,0 +1,46 @@
+# Copyright 2020 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+
+import rclpy
+from rclpy.parameter import PARAMETER_SEPARATOR_STRING
+
+
+def main(args=None):
+ rclpy.init(args=args)
+
+ node = rclpy.create_node('parameter_node')
+ node.declare_parameter('bool_param', True)
+ node.declare_parameter('int_param', 42)
+ node.declare_parameter('double_param', 1.23)
+ node.declare_parameter('str_param', 'Hello World')
+ node.declare_parameter('foo' + PARAMETER_SEPARATOR_STRING + 'str_param', 'foo')
+ node.declare_parameter('foo' + PARAMETER_SEPARATOR_STRING +
+ 'bar' + PARAMETER_SEPARATOR_STRING +
+ 'str_param', 'foobar')
+
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ print('parameter node stopped cleanly')
+ except BaseException:
+ print('exception in parameter node:', file=sys.stderr)
+ raise
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/ros2param/test/test_verb_dump.py b/ros2param/test/test_verb_dump.py
index 76f596b42..ab561d5ac 100644
--- a/ros2param/test/test_verb_dump.py
+++ b/ros2param/test/test_verb_dump.py
@@ -12,75 +12,124 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from io import StringIO
+import contextlib
import os
+import sys
import tempfile
-import threading
import time
import unittest
-from unittest.mock import patch
import xmlrpc
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+from launch_ros.actions import Node
+import launch_testing
+import launch_testing.actions
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
+import launch_testing_ros.tools
+
+import pytest
+
import rclpy
-from rclpy.executors import MultiThreadedExecutor
-from rclpy.parameter import PARAMETER_SEPARATOR_STRING
+from rclpy.utilities import get_available_rmw_implementations
-from ros2cli import cli
from ros2cli.node.strategy import NodeStrategy
TEST_NODE = 'test_node'
TEST_NAMESPACE = '/foo'
-TEST_TIMEOUT = 5.0
-
-EXPECTED_PARAMETER_FILE = """\
-test_node:
- ros__parameters:
- bool_param: true
- double_param: 1.23
- foo:
- bar:
- str_param: foobar
- str_param: foo
- int_param: 42
- str_param: Hello World
- use_sim_time: false
-"""
+TEST_TIMEOUT = 20.0
+
+EXPECTED_PARAMETER_FILE = (
+ f'{TEST_NODE}:\n'
+ ' ros__parameters:\n'
+ ' bool_param: true\n'
+ ' double_param: 1.23\n'
+ ' foo:\n'
+ ' bar:\n'
+ ' str_param: foobar\n'
+ ' str_param: foo\n'
+ ' int_param: 42\n'
+ ' str_param: Hello World\n'
+ ' use_sim_time: false\n'
+)
+
+# Skip cli tests on Windows while they exhibit pathological behavior
+# https://github.com/ros2/build_farmer/issues/248
+if sys.platform.startswith('win'):
+ pytest.skip(
+ 'CLI tests can block for a pathological amount of time on Windows.',
+ allow_module_level=True)
+
+
+@pytest.mark.rostest
+@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
+def generate_test_description(rmw_implementation):
+ path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures')
+ additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
+
+ # Parameter node test fixture
+ path_to_parameter_node_script = os.path.join(path_to_fixtures, 'parameter_node.py')
+ parameter_node = Node(
+ executable=sys.executable,
+ name=TEST_NODE,
+ namespace=TEST_NAMESPACE,
+ arguments=[path_to_parameter_node_script],
+ additional_env=additional_env
+ )
+
+ return LaunchDescription([
+ # TODO(jacobperron): Provide a common RestartCliDaemon launch action in ros2cli
+ ExecuteProcess(
+ cmd=['ros2', 'daemon', 'stop'],
+ name='daemon-stop',
+ on_exit=[
+ ExecuteProcess(
+ cmd=['ros2', 'daemon', 'start'],
+ name='daemon-start',
+ on_exit=[
+ parameter_node,
+ launch_testing.actions.ReadyToTest(),
+ ],
+ additional_env=additional_env
+ )
+ ]
+ ),
+ ])
class TestVerbDump(unittest.TestCase):
@classmethod
- def setUpClass(cls):
- cls.context = rclpy.context.Context()
- rclpy.init(context=cls.context)
- cls.node = rclpy.create_node(
- TEST_NODE, namespace=TEST_NAMESPACE, context=cls.context)
-
- cls.executor = MultiThreadedExecutor(context=cls.context, num_threads=2)
- cls.executor.add_node(cls.node)
-
- cls.node.declare_parameter('bool_param', True)
- cls.node.declare_parameter('int_param', 42)
- cls.node.declare_parameter('double_param', 1.23)
- cls.node.declare_parameter('str_param', 'Hello World')
- cls.node.declare_parameter('foo' + PARAMETER_SEPARATOR_STRING +
- 'str_param', 'foo')
- cls.node.declare_parameter('foo' + PARAMETER_SEPARATOR_STRING +
- 'bar' + PARAMETER_SEPARATOR_STRING +
- 'str_param', 'foobar')
-
- # We need both the test node and 'dump'
- # node to be able to spin
- cls.exec_thread = threading.Thread(target=cls.executor.spin)
- cls.exec_thread.start()
-
- @classmethod
- def tearDownClass(cls):
- cls.executor.shutdown()
- cls.node.destroy_node()
- rclpy.shutdown(context=cls.context)
- cls.exec_thread.join()
+ def setUpClass(
+ cls,
+ launch_service,
+ proc_info,
+ proc_output,
+ rmw_implementation
+ ):
+ rmw_implementation_filter = launch_testing_ros.tools.basic_output_filter(
+ filtered_rmw_implementation=rmw_implementation
+ )
+
+ @contextlib.contextmanager
+ def launch_param_dump_command(self, arguments):
+ param_dump_command_action = ExecuteProcess(
+ cmd=['ros2', 'param', 'dump', *arguments],
+ additional_env={
+ 'RMW_IMPLEMENTATION': rmw_implementation,
+ },
+ name='ros2param-dump-cli',
+ output='screen'
+ )
+ with launch_testing.tools.launch_process(
+ launch_service, param_dump_command_action, proc_info, proc_output,
+ output_filter=rmw_implementation_filter
+ ) as param_dump_command:
+ yield param_dump_command
+ cls.launch_param_dump_command = launch_param_dump_command
def setUp(self):
# Ensure the daemon node is running and discovers the test node
@@ -110,52 +159,80 @@ def setUp(self):
self.fail(f'CLI daemon failed to find test node after {TEST_TIMEOUT} seconds')
def _output_file(self):
-
- ns = self.node.get_namespace()
- name = self.node.get_name()
- if '/' == ns:
- fqn = ns + name
- else:
- fqn = ns + '/' + name
- return fqn[1:].replace('/', '__') + '.yaml'
+ return f'{TEST_NAMESPACE}/{TEST_NODE}'.lstrip('/').replace('/', '__') + '.yaml'
def test_verb_dump_invalid_node(self):
- assert cli.main(
- argv=['param', 'dump', 'invalid_node']) == 'Node not found'
-
- assert cli.main(
- argv=['param', 'dump', 'invalid_ns/test_node']) == 'Node not found'
+ with self.launch_param_dump_command(arguments=['invalid_node']) as param_dump_command:
+ assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
+ assert param_dump_command.exit_code != launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Node not found'],
+ text=param_dump_command.output,
+ strict=True
+ )
+ with self.launch_param_dump_command(
+ arguments=[f'invalid_ns/{TEST_NODE}']
+ ) as param_dump_command:
+ assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
+ assert param_dump_command.exit_code != launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Node not found'],
+ text=param_dump_command.output,
+ strict=True
+ )
def test_verb_dump_invalid_path(self):
- assert cli.main(
- argv=['param', 'dump', 'foo/test_node', '--output-dir', 'invalid_path']) \
- == "'invalid_path' is not a valid directory."
+ with self.launch_param_dump_command(
+ arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--output-dir', 'invalid_path']
+ ) as param_dump_command:
+ assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
+ assert param_dump_command.exit_code != launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=["'invalid_path' is not a valid directory."],
+ text=param_dump_command.output,
+ strict=True
+ )
def test_verb_dump(self):
with tempfile.TemporaryDirectory() as tmpdir:
- assert cli.main(
- argv=['param', 'dump', '/foo/test_node', '--output-dir', tmpdir]) is None
-
+ with self.launch_param_dump_command(
+ arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--output-dir', tmpdir]
+ ) as param_dump_command:
+ assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
+ assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[''],
+ text=param_dump_command.output,
+ strict=True
+ )
# Compare generated parameter file against expected
generated_param_file = os.path.join(tmpdir, self._output_file())
assert (open(generated_param_file, 'r').read() == EXPECTED_PARAMETER_FILE)
def test_verb_dump_print(self):
- with patch('sys.stdout', new=StringIO()) as fake_stdout:
- assert cli.main(
- argv=['param', 'dump', 'foo/test_node', '--print']) is None
-
- # Compare generated stdout against expected
- assert fake_stdout.getvalue().strip() == EXPECTED_PARAMETER_FILE[:-1]
-
+ with self.launch_param_dump_command(
+ arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--print']
+ ) as param_dump_command:
+ assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
+ assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_text=EXPECTED_PARAMETER_FILE + '\n',
+ text=param_dump_command.output,
+ strict=True
+ )
+ # If both '--output-dir' and '--print' options are provided, ensure it only prints
+ # and no file is written
with tempfile.TemporaryDirectory() as tmpdir:
- assert cli.main(
- argv=['param', 'dump', 'foo/test_node', '--output-dir', tmpdir, '--print']) is None
-
- not_generated_param_file = os.path.join(tmpdir, self._output_file())
-
- with self.assertRaises(OSError) as context:
- open(not_generated_param_file, 'r')
-
+ with self.launch_param_dump_command(
+ arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--output-dir', tmpdir, '--print']
+ ) as param_dump_command:
+ assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
+ assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_text=EXPECTED_PARAMETER_FILE + '\n',
+ text=param_dump_command.output,
+ strict=True
+ )
# Make sure the file was not create, thus '--print' did preempt
- assert '[Errno 2] No such file or directory' in str(context.exception)
+ not_generated_param_file = os.path.join(tmpdir, self._output_file())
+ assert not os.path.exists(not_generated_param_file)