Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests: establish test/util.py and test/test_nmt.py #503

Merged
merged 2 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added test/__init__.py
Empty file.
6 changes: 2 additions & 4 deletions test/test_eds.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import os
import unittest

import canopen
from canopen.objectdictionary.eds import _signed_int_from_hex
from canopen.utils import pretty_index


SAMPLE_EDS = os.path.join(os.path.dirname(__file__), 'sample.eds')
DATATYPES_EDS = os.path.join(os.path.dirname(__file__), 'datatypes.eds')
from .util import SAMPLE_EDS, DATATYPES_EDS


class TestEDS(unittest.TestCase):
Expand Down
72 changes: 3 additions & 69 deletions test/test_local.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import os
import unittest
import canopen
import logging
import time
import unittest

# logging.basicConfig(level=logging.DEBUG)

EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds')
import canopen
from .util import EDS_PATH


class TestSDO(unittest.TestCase):
Expand Down Expand Up @@ -172,68 +168,6 @@ def test_callbacks(self):
self.assertEqual(self._kwargs["data"], b"\x03\x04")


class TestNMT(unittest.TestCase):
"""
Test NMT slave.
"""

@classmethod
def setUpClass(cls):
cls.network1 = canopen.Network()
cls.network1.connect("test", interface="virtual")
cls.remote_node = cls.network1.add_node(2, EDS_PATH)

cls.network2 = canopen.Network()
cls.network2.connect("test", interface="virtual")
cls.local_node = cls.network2.create_node(2, EDS_PATH)

cls.remote_node2 = cls.network1.add_node(3, EDS_PATH)

cls.local_node2 = cls.network2.create_node(3, EDS_PATH)

@classmethod
def tearDownClass(cls):
cls.network1.disconnect()
cls.network2.disconnect()

def test_start_two_remote_nodes(self):
self.remote_node.nmt.state = 'OPERATIONAL'
# Line below is just so that we are sure the client have received the command
# before we do the check
time.sleep(0.1)
slave_state = self.local_node.nmt.state
self.assertEqual(slave_state, 'OPERATIONAL')

self.remote_node2.nmt.state = 'OPERATIONAL'
# Line below is just so that we are sure the client have received the command
# before we do the check
time.sleep(0.1)
slave_state = self.local_node2.nmt.state
self.assertEqual(slave_state, 'OPERATIONAL')

def test_stop_two_remote_nodes_using_broadcast(self):
# This is a NMT broadcast "Stop remote node"
# ie. set the node in STOPPED state
self.network1.send_message(0, [2, 0])

# Line below is just so that we are sure the slaves have received the command
# before we do the check
time.sleep(0.1)
slave_state = self.local_node.nmt.state
self.assertEqual(slave_state, 'STOPPED')
slave_state = self.local_node2.nmt.state
self.assertEqual(slave_state, 'STOPPED')

def test_heartbeat(self):
# self.assertEqual(self.remote_node.nmt.state, 'INITIALISING')
# self.assertEqual(self.local_node.nmt.state, 'INITIALISING')
self.local_node.nmt.state = 'OPERATIONAL'
self.local_node.sdo[0x1017].raw = 100
time.sleep(0.2)
self.assertEqual(self.remote_node.nmt.state, 'OPERATIONAL')

self.local_node.nmt.stop_heartbeat()

class TestPDO(unittest.TestCase):
"""
Test PDO slave.
Expand Down
6 changes: 2 additions & 4 deletions test/test_network.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import time
import os
import unittest
import canopen

import canopen
import can

EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds')
from .util import EDS_PATH


class TestNetwork(unittest.TestCase):
Expand Down
64 changes: 64 additions & 0 deletions test/test_nmt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import time
import unittest

import canopen
from .util import EDS_PATH


class TestNmtSlave(unittest.TestCase):
def setUp(self):
self.network1 = canopen.Network()
self.network1.connect("test", interface="virtual")
self.remote_node = self.network1.add_node(2, EDS_PATH)

self.network2 = canopen.Network()
self.network2.connect("test", interface="virtual")
self.local_node = self.network2.create_node(2, EDS_PATH)
self.remote_node2 = self.network1.add_node(3, EDS_PATH)
self.local_node2 = self.network2.create_node(3, EDS_PATH)

def tearDown(self):
self.network1.disconnect()
self.network2.disconnect()

def test_start_two_remote_nodes(self):
self.remote_node.nmt.state = "OPERATIONAL"
# Line below is just so that we are sure the client have received the command
# before we do the check
time.sleep(0.1)
slave_state = self.local_node.nmt.state
self.assertEqual(slave_state, "OPERATIONAL")

self.remote_node2.nmt.state = "OPERATIONAL"
# Line below is just so that we are sure the client have received the command
# before we do the check
time.sleep(0.1)
slave_state = self.local_node2.nmt.state
self.assertEqual(slave_state, "OPERATIONAL")

def test_stop_two_remote_nodes_using_broadcast(self):
# This is a NMT broadcast "Stop remote node"
# ie. set the node in STOPPED state
self.network1.send_message(0, [2, 0])

# Line below is just so that we are sure the slaves have received the command
# before we do the check
time.sleep(0.1)
slave_state = self.local_node.nmt.state
self.assertEqual(slave_state, "STOPPED")
slave_state = self.local_node2.nmt.state
self.assertEqual(slave_state, "STOPPED")

def test_heartbeat(self):
self.assertEqual(self.remote_node.nmt.state, "INITIALISING")
self.assertEqual(self.local_node.nmt.state, "INITIALISING")
self.local_node.nmt.state = "OPERATIONAL"
self.local_node.sdo[0x1017].raw = 100
time.sleep(0.2)
self.assertEqual(self.remote_node.nmt.state, "OPERATIONAL")

self.local_node.nmt.stop_heartbeat()


if __name__ == "__main__":
unittest.main()
5 changes: 2 additions & 3 deletions test/test_pdo.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import os.path
import unittest
import canopen

EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds')
import canopen
from .util import EDS_PATH


class TestPDO(unittest.TestCase):
Expand Down
16 changes: 5 additions & 11 deletions test/test_sdo.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import os
import unittest
# import binascii

import canopen
from canopen.objectdictionary import ODVariable
import canopen.objectdictionary.datatypes as dt
from canopen.objectdictionary import ODVariable
from .util import SAMPLE_EDS, DATATYPES_EDS

EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds')
DATAEDS_PATH = os.path.join(os.path.dirname(__file__), 'datatypes.eds')

TX = 1
RX = 2
Expand All @@ -26,17 +24,15 @@ def _send_message(self, can_id, data, remote=False):
"""
next_data = self.data.pop(0)
self.assertEqual(next_data[0], TX, "No transmission was expected")
# print(f"> {binascii.hexlify(data)} ({binascii.hexlify(next_data[1])})")
self.assertSequenceEqual(data, next_data[1])
self.assertEqual(can_id, 0x602)
while self.data and self.data[0][0] == RX:
# print(f"< {binascii.hexlify(self.data[0][1])}")
self.network.notify(0x582, self.data.pop(0)[1], 0.0)

def setUp(self):
network = canopen.Network()
network.send_message = self._send_message
node = network.add_node(2, EDS_PATH)
node = network.add_node(2, SAMPLE_EDS)
node.sdo.RESPONSE_TIMEOUT = 0.01
self.network = network

Expand Down Expand Up @@ -178,17 +174,15 @@ def _send_message(self, can_id, data, remote=False):
"""
next_data = self.data.pop(0)
self.assertEqual(next_data[0], TX, "No transmission was expected")
# print("> %s (%s)" % (binascii.hexlify(data), binascii.hexlify(next_data[1])))
self.assertSequenceEqual(data, next_data[1])
self.assertEqual(can_id, 0x602)
while self.data and self.data[0][0] == RX:
# print("< %s" % binascii.hexlify(self.data[0][1]))
self.network.notify(0x582, self.data.pop(0)[1], 0.0)

def setUp(self):
network = canopen.Network()
network.send_message = self._send_message
node = network.add_node(2, DATAEDS_PATH)
node = network.add_node(2, DATATYPES_EDS)
node.sdo.RESPONSE_TIMEOUT = 0.01
self.node = node
self.network = network
Expand Down
6 changes: 6 additions & 0 deletions test/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import os


DATATYPES_EDS = os.path.join(os.path.dirname(__file__), "datatypes.eds")
SAMPLE_EDS = os.path.join(os.path.dirname(__file__), "sample.eds")
EDS_PATH = SAMPLE_EDS
erlend-aasland marked this conversation as resolved.
Show resolved Hide resolved
Loading