Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests: harden TestNetwork and TestNmt even more #508

Merged
51 changes: 37 additions & 14 deletions test/test_network.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import unittest
from threading import Event
import threading

import canopen
import can
Expand Down Expand Up @@ -57,6 +57,7 @@ def test_send_periodic(self):
DATA2 = bytes([4, 5, 6])
COB_ID = 0x123
PERIOD = 0.1
TIMEOUT = PERIOD * 10
self.network.connect(
interface="virtual",
channel=1,
Expand All @@ -65,30 +66,52 @@ def test_send_periodic(self):
self.addCleanup(self.network.disconnect)

acc = []
event = Event()
condition = threading.Condition()

def hook(_, data, ts):
acc.append((data, ts))
event.set()
item = data, ts
acc.append(item)
condition.notify()

self.network.subscribe(COB_ID, hook)
self.addCleanup(self.network.unsubscribe, COB_ID)

task = self.network.send_periodic(COB_ID, DATA1, PERIOD)
self.addCleanup(task.stop)

event.wait(PERIOD*2)

# Update task data.
def periodicity():
# Check if periodicity is established; flakiness has been observed
# on macOS.
if len(acc) >= 2:
delta = acc[-1][1] - acc[-2][1]
return round(delta, ndigits=1) == PERIOD
return False

# Wait for frames to arrive; then check the result.
with condition:
condition.wait_for(periodicity, TIMEOUT)
self.assertTrue(all(v[0] == DATA1 for v in acc))

# Update task data, which implicitly restarts the timer.
# Wait for frames to arrive; then check the result.
task.update(DATA2)
event.clear()
event.wait(PERIOD*2)
task.stop()

with condition:
acc.clear()
condition.wait_for(periodicity, TIMEOUT)
# Find the first message with new data, and verify that all subsequent
# messages also carry the new payload.
data = [v[0] for v in acc]
self.assertEqual(data, [DATA1, DATA2])
ts = [v[1] for v in acc]
self.assertAlmostEqual(ts[1]-ts[0], PERIOD, places=1)
idx = data.index(DATA2)
self.assertTrue(all(v[0] == DATA2 for v in acc[idx:]))

# Stop the task.
task.stop()
# A message may have been in flight when we stopped the timer,
# so allow a single failure.
bus = self.network.bus
msg = bus.recv(TIMEOUT)
if msg:
erlend-aasland marked this conversation as resolved.
Show resolved Hide resolved
self.assertIsNone(bus.recv(TIMEOUT))


class TestScanner(unittest.TestCase):
Expand Down
6 changes: 5 additions & 1 deletion test/test_nmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ def test_nmt_master_node_guarding(self):
self.assertEqual(msg.dlc, 0)

self.node.nmt.stop_node_guarding()
self.assertIsNone(self.bus.recv(self.TIMEOUT))
# A message may have been in flight when we stopped the timer,
# so allow a single failure.
msg = self.bus.recv(self.TIMEOUT)
if msg:
erlend-aasland marked this conversation as resolved.
Show resolved Hide resolved
self.assertIsNone(self.bus.recv(self.TIMEOUT))


class TestNmtSlave(unittest.TestCase):
Expand Down
Loading