diff --git a/sros2/package.xml b/sros2/package.xml
index 2072156e..14261412 100644
--- a/sros2/package.xml
+++ b/sros2/package.xml
@@ -22,6 +22,8 @@
ament_mypy
ament_pep257
python3-pytest
+
+ ros_testing
test_msgs
diff --git a/sros2/test/sros2/commands/security/verbs/fixtures/client_service_node.py b/sros2/test/sros2/commands/security/verbs/fixtures/client_service_node.py
new file mode 100644
index 00000000..520d7ab4
--- /dev/null
+++ b/sros2/test/sros2/commands/security/verbs/fixtures/client_service_node.py
@@ -0,0 +1,51 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import rclpy
+from rclpy.node import Node
+
+import test_msgs.srv
+
+
+class ClientServiceNode(Node):
+
+ def __init__(self, name='client_srv_node'):
+ super().__init__(name)
+ self.client = self.create_client(test_msgs.srv.Empty, '~/client')
+ self.service = self.create_service(
+ test_msgs.srv.Empty, '~/server', lambda request, response: response
+ )
+
+ def destroy_node(self):
+ self.client.destroy()
+ self.service.destroy()
+ super().destroy_node()
+
+
+def main(args=None):
+ rclpy.init(args=args)
+
+ node = ClientServiceNode()
+
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ print('node stopped cleanly')
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/sros2/test/sros2/commands/security/verbs/fixtures/pub_sub_node.py b/sros2/test/sros2/commands/security/verbs/fixtures/pub_sub_node.py
new file mode 100644
index 00000000..ac786101
--- /dev/null
+++ b/sros2/test/sros2/commands/security/verbs/fixtures/pub_sub_node.py
@@ -0,0 +1,54 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import rclpy
+from rclpy.node import Node
+
+import test_msgs.msg
+
+
+class PubSubNode(Node):
+
+ def __init__(self, name='pub_sub_node'):
+ super().__init__(name)
+
+ self.publisher = self.create_publisher(
+ test_msgs.msg.Strings, '~/pub', 1
+ )
+ self.subscription = self.create_subscription(
+ test_msgs.msg.Strings, '~/sub', lambda msg: None, 1
+ )
+
+ def destroy_node(self):
+ self.publisher.destroy()
+ self.subscription.destroy()
+ super().destroy_node()
+
+
+def main(args=None):
+ rclpy.init(args=args)
+
+ node = PubSubNode()
+
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ print('node stopped cleanly')
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/sros2/test/sros2/commands/security/verbs/test_generate_policy.py b/sros2/test/sros2/commands/security/verbs/test_generate_policy.py
index 8d31c7cd..827b7387 100644
--- a/sros2/test/sros2/commands/security/verbs/test_generate_policy.py
+++ b/sros2/test/sros2/commands/security/verbs/test_generate_policy.py
@@ -1,4 +1,5 @@
# Copyright 2019 Canonical Ltd
+# Copyright 2020 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,133 +13,174 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import itertools
import os
+import sys
import tempfile
+import unittest
+
+from launch_ros.actions import Node
+
+import launch_testing
+from launch_testing.actions import ReadyToTest
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
import pytest
-import rclpy
-from ros2cli import cli
+from rclpy.expand_topic_name import expand_topic_name
+from rclpy.utilities import get_available_rmw_implementations
from sros2.policy import load_policy
-from test_msgs.msg import Strings
-from test_msgs.srv import Empty
-
-
-def test_generate_policy_topics():
- with tempfile.TemporaryDirectory() as tmpdir:
- test_enclave = '/foo/bar'
- test_node_namespace = '/'
- test_name = 'test_generate_policy_topics'
- test_node_name = f'{test_name}_node'
- # Create a test-specific context so that generate_policy can still init
- context = rclpy.Context()
- rclpy.init(context=context, args=['--ros-args', '-e', test_enclave])
- node = rclpy.create_node(test_node_name, context=context)
-
- try:
- # Create a publisher and subscription
- node.create_publisher(Strings, f'{test_name}_pub', 1)
- node.create_subscription(Strings, f'{test_name}_sub', lambda msg: None, 1)
-
- # Generate the policy for the running node
- assert cli.main(
- argv=['security', 'generate_policy', os.path.join(tmpdir, 'test-policy.xml')]) == 0
- finally:
- node.destroy_node()
- rclpy.shutdown(context=context)
-
- # Load the policy and pull out the allowed publications and subscriptions
- policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
- profile = policy.find(
- path=f'enclaves/enclave[@path="{test_enclave}"]'
- + f'/profiles/profile[@ns="{test_node_namespace}"]'
- + f'[@node="{test_node_name}"]'
- )
- assert profile is not None
- topics_publish_allowed = profile.find(path='topics[@publish="ALLOW"]')
- assert topics_publish_allowed is not None
- topics_subscribe_allowed = profile.find(path='topics[@subscribe="ALLOW"]')
- assert topics_subscribe_allowed is not None
-
- # Verify that the allowed publications include topic_pub and not topic_sub
- topics = topics_publish_allowed.findall('topic')
- assert len([t for t in topics if t.text == f'{test_name}_pub']) == 1
- assert len([t for t in topics if t.text == f'{test_name}_sub']) == 0
-
- # Verify that the allowed subscriptions include topic_sub and not topic_pub
- topics = topics_subscribe_allowed.findall('topic')
- assert len([t for t in topics if t.text == f'{test_name}_sub']) == 1
- assert len([t for t in topics if t.text == f'{test_name}_pub']) == 0
-
-
-def test_generate_policy_services():
- with tempfile.TemporaryDirectory() as tmpdir:
- # Create a test-specific context so that generate_policy can still init
- context = rclpy.Context()
- test_enclave = '/foo'
- test_node_namespace = '/node_ns'
- test_name = 'test_generate_policy_services'
- test_node_name = test_name + '_node'
- rclpy.init(context=context, args=['--ros-args', '-e', test_enclave])
- node = rclpy.create_node(
- test_node_name,
- namespace=test_node_namespace,
- context=context
- )
- try:
- # Create a server and client
- node.create_client(Empty, f'{test_name}_client')
- node.create_service(Empty, f'{test_name}_server', lambda request,
- response: response)
-
- # Generate the policy for the running node
- assert cli.main(
- argv=['security', 'generate_policy', os.path.join(tmpdir, 'test-policy.xml')]) == 0
- finally:
- node.destroy_node()
- rclpy.shutdown(context=context)
-
- # Load the policy and pull out allowed replies and requests
- policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
- profile = policy.find(
- path=f'enclaves/enclave[@path="{test_enclave}"]'
- + f'/profiles/profile[@ns="{test_node_namespace}"]'
- + f'[@node="{test_node_name}"]'
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+from utilities.sros2_cli_test_case import generate_sros2_cli_test_description # noqa: E402
+from utilities.sros2_cli_test_case import SROS2CLITestCase # noqa: E402
+
+
+@pytest.mark.rostest
+@launch_testing.parametrize('rmw_implementation,use_daemon', itertools.product(
+ get_available_rmw_implementations(), (True, False)
+))
+def generate_test_description(rmw_implementation, use_daemon):
+ if 'connext' in rmw_implementation and not use_daemon:
+ raise unittest.SkipTest(
+ f'Using {rmw_implementation} w/o a daemon makes tests flaky'
)
- assert profile is not None
- service_reply_allowed = profile.find(path='services[@reply="ALLOW"]')
- assert service_reply_allowed is not None
- service_request_allowed = profile.find(path='services[@request="ALLOW"]')
- assert service_request_allowed is not None
-
- # Verify that the allowed replies include service_server and not service_client
- services = service_reply_allowed.findall('service')
- assert len([s for s in services if s.text == f'{test_name}_server']) == 1
- assert len([s for s in services if s.text == f'{test_name}_client']) == 0
-
- # Verify that the allowed requests include service_client and not service_server
- services = service_request_allowed.findall('service')
- assert len([s for s in services if s.text == f'{test_name}_client']) == 1
- assert len([s for s in services if s.text == f'{test_name}_server']) == 0
-
-
-# TODO(jacobperron): On Windows, this test is flakey due to nodes left-over from tests in
-# other packages.
-# See: https://github.com/ros2/sros2/issues/143
-@pytest.mark.skipif(
- 'nt' == os.name, reason='flakey due to nodes left-over from tests in other packages')
-def test_generate_policy_no_nodes(capsys):
- with tempfile.TemporaryDirectory() as tmpdir:
- assert cli.main(argv=[
- 'security', 'generate_policy', os.path.join(tmpdir, 'test-policy.xml')]) != 0
- stderr = capsys.readouterr().err.strip()
- assert stderr == 'No nodes detected in the ROS graph. No policy file was generated.'
-
-
-def test_generate_policy_no_policy_file(capsys):
- with pytest.raises(SystemExit) as e:
- cli.main(argv=['security', 'generate_policy'])
- assert e.value.code != 0
- stderr = capsys.readouterr().err.strip()
- assert 'following arguments are required: POLICY_FILE_PATH' in stderr
+
+ additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
+ path_to_pub_sub_node_script = os.path.join(
+ os.path.dirname(__file__), 'fixtures', 'pub_sub_node.py'
+ )
+ pub_sub_node_enclave = '/foo/bar'
+ pub_sub_node_namespace = '/'
+ pub_sub_node_name = 'test_generate_policy_topics_node'
+ pub_sub_node = Node(
+ executable=sys.executable,
+ arguments=[
+ path_to_pub_sub_node_script,
+ '--ros-args',
+ '-e', pub_sub_node_enclave
+ ],
+ name=pub_sub_node_name,
+ namespace=pub_sub_node_namespace,
+ additional_env=additional_env
+ )
+ path_to_client_srv_node_script = os.path.join(
+ os.path.dirname(__file__), 'fixtures', 'client_service_node.py'
+ )
+ client_srv_node_enclave = '/foo'
+ client_srv_node_namespace = '/node_ns'
+ client_srv_node_name = 'test_generate_policy_services_node'
+ client_srv_node = Node(
+ executable=sys.executable,
+ arguments=[
+ path_to_client_srv_node_script,
+ '--ros-args',
+ '-e', client_srv_node_enclave
+ ],
+ name=client_srv_node_name,
+ namespace=client_srv_node_namespace,
+ additional_env=additional_env
+ )
+ return generate_sros2_cli_test_description(
+ fixture_actions=[
+ pub_sub_node,
+ client_srv_node,
+ ReadyToTest()
+ ],
+ rmw_implementation=rmw_implementation,
+ use_daemon=use_daemon
+ ), locals()
+
+
+GENERATE_POLICY_TIMEOUT = 10 if os.name != 'nt' else 30 # seconds
+
+
+class TestSROS2GeneratePolicyVerb(SROS2CLITestCase):
+
+ def test_generate_policy_topics(
+ self,
+ pub_sub_node_name,
+ pub_sub_node_namespace,
+ pub_sub_node_enclave
+ ):
+ assert self.wait_for(expected_topics=[
+ expand_topic_name('~/pub', pub_sub_node_name, pub_sub_node_namespace),
+ expand_topic_name('~/sub', pub_sub_node_name, pub_sub_node_namespace),
+ ])
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ with self.launch_sros2_command(
+ arguments=['generate_policy', os.path.join(tmpdir, 'test-policy.xml')]
+ ) as gen_command:
+ assert gen_command.wait_for_shutdown(timeout=GENERATE_POLICY_TIMEOUT)
+ assert gen_command.exit_code == launch_testing.asserts.EXIT_OK
+
+ # Load the policy and pull out the allowed publications and subscriptions
+ policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
+ profile = policy.find(
+ path=f'enclaves/enclave[@path="{pub_sub_node_enclave}"]'
+ + f'/profiles/profile[@ns="{pub_sub_node_namespace}"]'
+ + f'[@node="{pub_sub_node_name}"]'
+ )
+ assert profile is not None
+ topics_publish_allowed = profile.find(path='topics[@publish="ALLOW"]')
+ assert topics_publish_allowed is not None
+ topics_subscribe_allowed = profile.find(path='topics[@subscribe="ALLOW"]')
+ assert topics_subscribe_allowed is not None
+
+ # Verify that the allowed publications include topic_pub and not topic_sub
+ topics = topics_publish_allowed.findall('topic')
+ assert len([t for t in topics if t.text == '~/pub']) == 1
+ assert len([t for t in topics if t.text == '~/sub']) == 0
+
+ # Verify that the allowed subscriptions include topic_sub and not topic_pub
+ topics = topics_subscribe_allowed.findall('topic')
+ assert len([t for t in topics if t.text == '~/sub']) == 1
+ assert len([t for t in topics if t.text == '~/pub']) == 0
+
+ def test_generate_policy_services(
+ self,
+ client_srv_node_name,
+ client_srv_node_namespace,
+ client_srv_node_enclave
+ ):
+ assert self.wait_for(expected_services=[
+ expand_topic_name('~/client', client_srv_node_name, client_srv_node_namespace),
+ expand_topic_name('~/server', client_srv_node_name, client_srv_node_namespace),
+ ])
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ with self.launch_sros2_command(
+ arguments=['generate_policy', os.path.join(tmpdir, 'test-policy.xml')]
+ ) as gen_command:
+ assert gen_command.wait_for_shutdown(timeout=GENERATE_POLICY_TIMEOUT)
+ assert gen_command.exit_code == launch_testing.asserts.EXIT_OK
+
+ # Load the policy and pull out allowed replies and requests
+ policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
+ profile = policy.find(
+ path=f'enclaves/enclave[@path="{client_srv_node_enclave}"]'
+ + f'/profiles/profile[@ns="{client_srv_node_namespace}"]'
+ + f'[@node="{client_srv_node_name}"]'
+ )
+ assert profile is not None
+ service_reply_allowed = profile.find(path='services[@reply="ALLOW"]')
+ assert service_reply_allowed is not None
+ service_request_allowed = profile.find(path='services[@request="ALLOW"]')
+ assert service_request_allowed is not None
+
+ # Verify that the allowed replies include service_server and not service_client
+ services = service_reply_allowed.findall('service')
+ assert len(
+ [s for s in services if s.text == '~/server']) == 1
+ assert len(
+ [s for s in services if s.text == '~/client']) == 0
+
+ # Verify that the allowed requests include service_client and not service_server
+ services = service_request_allowed.findall('service')
+ assert len(
+ [s for s in services if s.text == '~/client']) == 1
+ assert len(
+ [s for s in services if s.text == '~/server']) == 0
diff --git a/sros2/test/sros2/commands/security/verbs/test_generate_policy_no_nodes.py b/sros2/test/sros2/commands/security/verbs/test_generate_policy_no_nodes.py
new file mode 100644
index 00000000..ceaa17b8
--- /dev/null
+++ b/sros2/test/sros2/commands/security/verbs/test_generate_policy_no_nodes.py
@@ -0,0 +1,65 @@
+# Copyright 2019 Canonical Ltd
+# Copyright 2020 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import os
+import sys
+import tempfile
+
+import launch_testing
+from launch_testing.actions import ReadyToTest
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
+
+import pytest
+
+from rclpy.utilities import get_available_rmw_implementations
+
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+from utilities.sros2_cli_test_case import generate_sros2_cli_test_description # noqa: E402
+from utilities.sros2_cli_test_case import SROS2CLITestCase # noqa: E402
+
+
+@pytest.mark.rostest
+@launch_testing.parametrize('rmw_implementation,use_daemon', itertools.product(
+ get_available_rmw_implementations(), (True, False)
+))
+@launch_testing.markers.keep_alive
+def generate_test_description(rmw_implementation, use_daemon):
+ return generate_sros2_cli_test_description(
+ fixture_actions=[ReadyToTest()],
+ rmw_implementation=rmw_implementation,
+ use_daemon=use_daemon
+ ), locals()
+
+
+class TestSROS2GeneratePolicyVerbWithNoNodes(SROS2CLITestCase):
+
+ def test_generate_policy_no_nodes(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ with self.launch_sros2_command(
+ arguments=['generate_policy', os.path.join(tmpdir, 'test-policy.xml')]
+ ) as gen_command:
+ assert gen_command.wait_for_shutdown(timeout=10)
+ assert gen_command.exit_code != launch_testing.asserts.EXIT_OK
+ assert 'No nodes detected in the ROS graph. No policy file was generated.' \
+ in gen_command.output
+
+ def test_generate_policy_no_policy_file(self):
+ with self.launch_sros2_command(arguments=['generate_policy']) as gen_command:
+ assert gen_command.wait_for_shutdown(timeout=2)
+ assert gen_command.exit_code != launch_testing.asserts.EXIT_OK
+ assert 'following arguments are required: POLICY_FILE_PATH' in gen_command.output
diff --git a/sros2/test/sros2/commands/security/verbs/utilities/__init__.py b/sros2/test/sros2/commands/security/verbs/utilities/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/sros2/test/sros2/commands/security/verbs/utilities/sros2_cli_test_case.py b/sros2/test/sros2/commands/security/verbs/utilities/sros2_cli_test_case.py
new file mode 100644
index 00000000..ba90a981
--- /dev/null
+++ b/sros2/test/sros2/commands/security/verbs/utilities/sros2_cli_test_case.py
@@ -0,0 +1,111 @@
+# Copyright 2020 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import contextlib
+import time
+import unittest
+
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+
+import launch_testing
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
+import launch_testing_ros.tools
+
+from ros2cli.node.strategy import NodeStrategy
+
+
+MAX_DISCOVERY_DELAY = 4.0 # seconds
+
+
+def generate_sros2_cli_test_description(
+ fixture_actions, rmw_implementation, use_daemon
+):
+ additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
+ if use_daemon:
+ # Start daemon.
+ fixture_actions = [ExecuteProcess(
+ cmd=['ros2', 'daemon', 'start'],
+ name='daemon-start',
+ on_exit=fixture_actions,
+ additional_env=additional_env
+ )]
+ return LaunchDescription([
+ # Always stop daemon to avoid cross-talk.
+ ExecuteProcess(
+ cmd=['ros2', 'daemon', 'stop'],
+ name='daemon-stop',
+ on_exit=fixture_actions,
+ additional_env=additional_env
+ ),
+ ])
+
+
+class SROS2CLITestCase(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(
+ cls,
+ launch_service,
+ proc_info,
+ proc_output,
+ rmw_implementation,
+ use_daemon
+ ):
+ @contextlib.contextmanager
+ def launch_sros2_command(self, arguments):
+ cmd = ['ros2', 'security', *arguments]
+ if not use_daemon:
+ # Wait for direct node to discover fixture nodes.
+ cmd.extend(['--no-daemon', '--spin-time', f'{MAX_DISCOVERY_DELAY}'])
+ sros2_command_action = ExecuteProcess(
+ cmd=cmd,
+ additional_env={
+ 'RMW_IMPLEMENTATION': rmw_implementation,
+ 'PYTHONUNBUFFERED': '1'
+ },
+ name='ros2security-cli',
+ output='screen'
+ )
+ with launch_testing.tools.launch_process(
+ launch_service, sros2_command_action, proc_info, proc_output,
+ output_filter=launch_testing_ros.tools.basic_output_filter(
+ # ignore launch_ros and ros2cli daemon nodes
+ filtered_patterns=['.*launch_ros*', '.*ros2cli.*'],
+ filtered_rmw_implementation=rmw_implementation
+ )
+ ) as sros2_command:
+ yield sros2_command
+ cls.launch_sros2_command = launch_sros2_command
+
+ def wait_for(self, expected_topics=[], expected_services=[]):
+ if not expected_topics and not expected_services:
+ return True
+ args = argparse.Namespace()
+ args.use_daemon = use_daemon
+ args.spin_time = MAX_DISCOVERY_DELAY
+ with NodeStrategy(args) as node:
+ start_time = time.time()
+ while time.time() - start_time < MAX_DISCOVERY_DELAY:
+ topics = [name for name, _ in node.get_topic_names_and_types()]
+ services = [name for name, _ in node.get_service_names_and_types()]
+ if all(t in topics for t in expected_topics) and \
+ all(s in services for s in expected_services):
+ return True
+ time.sleep(0.1) # this sleep time is arbitrary
+ return False
+ cls.wait_for = wait_for