Skip to content

Commit

Permalink
[ros2param] Convert test_verb_dump into launch test (#485)
Browse files Browse the repository at this point in the history
Fixes #480

The actual tests are the same, except with the use of launch_testing we ensure the CLI daemon
is restarted between tests. This follows a similar pattern as the other ros2cli tests.

In addition to converting to launch tests, this change also runs the tests for all RMW implementations.

For now, we are skipping tests on Windows. Other CLI tests are skipped on Windows since #489. To be reverted when ros2/build_farmer#248 is resolved.

Signed-off-by: Jacob Perron <[email protected]>
  • Loading branch information
jacobperron authored Apr 18, 2020
1 parent e1fa7d1 commit 75e0ea1
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 86 deletions.
4 changes: 4 additions & 0 deletions ros2param/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>ament_xmllint</test_depend>
<test_depend>launch</test_depend>
<test_depend>launch_ros</test_depend>
<test_depend>launch_testing</test_depend>
<test_depend>launch_testing_ros</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
Expand Down
46 changes: 46 additions & 0 deletions ros2param/test/fixtures/parameter_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2020 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys

import rclpy
from rclpy.parameter import PARAMETER_SEPARATOR_STRING


def main(args=None):
rclpy.init(args=args)

node = rclpy.create_node('parameter_node')
node.declare_parameter('bool_param', True)
node.declare_parameter('int_param', 42)
node.declare_parameter('double_param', 1.23)
node.declare_parameter('str_param', 'Hello World')
node.declare_parameter('foo' + PARAMETER_SEPARATOR_STRING + 'str_param', 'foo')
node.declare_parameter('foo' + PARAMETER_SEPARATOR_STRING +
'bar' + PARAMETER_SEPARATOR_STRING +
'str_param', 'foobar')

try:
rclpy.spin(node)
except KeyboardInterrupt:
print('parameter node stopped cleanly')
except BaseException:
print('exception in parameter node:', file=sys.stderr)
raise
finally:
node.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()
249 changes: 163 additions & 86 deletions ros2param/test/test_verb_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,75 +12,124 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from io import StringIO
import contextlib
import os
import sys
import tempfile
import threading
import time
import unittest
from unittest.mock import patch
import xmlrpc

from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch_ros.actions import Node
import launch_testing
import launch_testing.actions
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools
import launch_testing_ros.tools

import pytest

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.parameter import PARAMETER_SEPARATOR_STRING
from rclpy.utilities import get_available_rmw_implementations

from ros2cli import cli
from ros2cli.node.strategy import NodeStrategy

TEST_NODE = 'test_node'
TEST_NAMESPACE = '/foo'

TEST_TIMEOUT = 5.0

EXPECTED_PARAMETER_FILE = """\
test_node:
ros__parameters:
bool_param: true
double_param: 1.23
foo:
bar:
str_param: foobar
str_param: foo
int_param: 42
str_param: Hello World
use_sim_time: false
"""
TEST_TIMEOUT = 20.0

EXPECTED_PARAMETER_FILE = (
f'{TEST_NODE}:\n'
' ros__parameters:\n'
' bool_param: true\n'
' double_param: 1.23\n'
' foo:\n'
' bar:\n'
' str_param: foobar\n'
' str_param: foo\n'
' int_param: 42\n'
' str_param: Hello World\n'
' use_sim_time: false\n'
)

# Skip cli tests on Windows while they exhibit pathological behavior
# https://github.com/ros2/build_farmer/issues/248
if sys.platform.startswith('win'):
pytest.skip(
'CLI tests can block for a pathological amount of time on Windows.',
allow_module_level=True)


@pytest.mark.rostest
@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
def generate_test_description(rmw_implementation):
path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures')
additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}

# Parameter node test fixture
path_to_parameter_node_script = os.path.join(path_to_fixtures, 'parameter_node.py')
parameter_node = Node(
executable=sys.executable,
name=TEST_NODE,
namespace=TEST_NAMESPACE,
arguments=[path_to_parameter_node_script],
additional_env=additional_env
)

return LaunchDescription([
# TODO(jacobperron): Provide a common RestartCliDaemon launch action in ros2cli
ExecuteProcess(
cmd=['ros2', 'daemon', 'stop'],
name='daemon-stop',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'daemon', 'start'],
name='daemon-start',
on_exit=[
parameter_node,
launch_testing.actions.ReadyToTest(),
],
additional_env=additional_env
)
]
),
])


class TestVerbDump(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls.context = rclpy.context.Context()
rclpy.init(context=cls.context)
cls.node = rclpy.create_node(
TEST_NODE, namespace=TEST_NAMESPACE, context=cls.context)

cls.executor = MultiThreadedExecutor(context=cls.context, num_threads=2)
cls.executor.add_node(cls.node)

cls.node.declare_parameter('bool_param', True)
cls.node.declare_parameter('int_param', 42)
cls.node.declare_parameter('double_param', 1.23)
cls.node.declare_parameter('str_param', 'Hello World')
cls.node.declare_parameter('foo' + PARAMETER_SEPARATOR_STRING +
'str_param', 'foo')
cls.node.declare_parameter('foo' + PARAMETER_SEPARATOR_STRING +
'bar' + PARAMETER_SEPARATOR_STRING +
'str_param', 'foobar')

# We need both the test node and 'dump'
# node to be able to spin
cls.exec_thread = threading.Thread(target=cls.executor.spin)
cls.exec_thread.start()

@classmethod
def tearDownClass(cls):
cls.executor.shutdown()
cls.node.destroy_node()
rclpy.shutdown(context=cls.context)
cls.exec_thread.join()
def setUpClass(
cls,
launch_service,
proc_info,
proc_output,
rmw_implementation
):
rmw_implementation_filter = launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=rmw_implementation
)

@contextlib.contextmanager
def launch_param_dump_command(self, arguments):
param_dump_command_action = ExecuteProcess(
cmd=['ros2', 'param', 'dump', *arguments],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
},
name='ros2param-dump-cli',
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, param_dump_command_action, proc_info, proc_output,
output_filter=rmw_implementation_filter
) as param_dump_command:
yield param_dump_command
cls.launch_param_dump_command = launch_param_dump_command

def setUp(self):
# Ensure the daemon node is running and discovers the test node
Expand Down Expand Up @@ -110,52 +159,80 @@ def setUp(self):
self.fail(f'CLI daemon failed to find test node after {TEST_TIMEOUT} seconds')

def _output_file(self):

ns = self.node.get_namespace()
name = self.node.get_name()
if '/' == ns:
fqn = ns + name
else:
fqn = ns + '/' + name
return fqn[1:].replace('/', '__') + '.yaml'
return f'{TEST_NAMESPACE}/{TEST_NODE}'.lstrip('/').replace('/', '__') + '.yaml'

def test_verb_dump_invalid_node(self):
assert cli.main(
argv=['param', 'dump', 'invalid_node']) == 'Node not found'

assert cli.main(
argv=['param', 'dump', 'invalid_ns/test_node']) == 'Node not found'
with self.launch_param_dump_command(arguments=['invalid_node']) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['Node not found'],
text=param_dump_command.output,
strict=True
)
with self.launch_param_dump_command(
arguments=[f'invalid_ns/{TEST_NODE}']
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['Node not found'],
text=param_dump_command.output,
strict=True
)

def test_verb_dump_invalid_path(self):
assert cli.main(
argv=['param', 'dump', 'foo/test_node', '--output-dir', 'invalid_path']) \
== "'invalid_path' is not a valid directory."
with self.launch_param_dump_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--output-dir', 'invalid_path']
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=["'invalid_path' is not a valid directory."],
text=param_dump_command.output,
strict=True
)

def test_verb_dump(self):
with tempfile.TemporaryDirectory() as tmpdir:
assert cli.main(
argv=['param', 'dump', '/foo/test_node', '--output-dir', tmpdir]) is None

with self.launch_param_dump_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--output-dir', tmpdir]
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[''],
text=param_dump_command.output,
strict=True
)
# Compare generated parameter file against expected
generated_param_file = os.path.join(tmpdir, self._output_file())
assert (open(generated_param_file, 'r').read() == EXPECTED_PARAMETER_FILE)

def test_verb_dump_print(self):
with patch('sys.stdout', new=StringIO()) as fake_stdout:
assert cli.main(
argv=['param', 'dump', 'foo/test_node', '--print']) is None

# Compare generated stdout against expected
assert fake_stdout.getvalue().strip() == EXPECTED_PARAMETER_FILE[:-1]

with self.launch_param_dump_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--print']
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_text=EXPECTED_PARAMETER_FILE + '\n',
text=param_dump_command.output,
strict=True
)
# If both '--output-dir' and '--print' options are provided, ensure it only prints
# and no file is written
with tempfile.TemporaryDirectory() as tmpdir:
assert cli.main(
argv=['param', 'dump', 'foo/test_node', '--output-dir', tmpdir, '--print']) is None

not_generated_param_file = os.path.join(tmpdir, self._output_file())

with self.assertRaises(OSError) as context:
open(not_generated_param_file, 'r')

with self.launch_param_dump_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--output-dir', tmpdir, '--print']
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_text=EXPECTED_PARAMETER_FILE + '\n',
text=param_dump_command.output,
strict=True
)
# Make sure the file was not create, thus '--print' did preempt
assert '[Errno 2] No such file or directory' in str(context.exception)
not_generated_param_file = os.path.join(tmpdir, self._output_file())
assert not os.path.exists(not_generated_param_file)

0 comments on commit 75e0ea1

Please sign in to comment.