Skip to content

Commit

Permalink
Tests: harden TestNetwork even more
Browse files Browse the repository at this point in the history
- test task.stop()
- use more lenient timeout
- make sure we always collect two frames
  • Loading branch information
erlend-aasland committed Jul 9, 2024
1 parent 3aa509d commit 3dacf57
Showing 1 changed file with 24 additions and 10 deletions.
34 changes: 24 additions & 10 deletions test/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def test_send_periodic(self):
DATA2 = bytes([4, 5, 6])
COB_ID = 0x123
PERIOD = 0.1
TIMEOUT = PERIOD * 10
self.network.connect(
interface="virtual",
channel=1,
Expand All @@ -68,27 +69,40 @@ def test_send_periodic(self):
event = Event()

def hook(_, data, ts):
acc.append((data, ts))
event.set()
if len(acc) < 2:
acc.append((data, ts))
else:
event.set()

self.network.subscribe(COB_ID, hook)
self.addCleanup(self.network.unsubscribe, COB_ID)

def check(expected):
data = [v[0] for v in acc]
self.assertEqual(data, [expected] * 2)
ts = [v[1] for v in acc]
delta = (ts[-1] - ts[0]) / (len(ts) - 1)
self.assertAlmostEqual(delta, PERIOD, places=1)

task = self.network.send_periodic(COB_ID, DATA1, PERIOD)
self.addCleanup(task.stop)

event.wait(PERIOD*2)
# Wait for two frames to arrive; then check the result.
event.wait(TIMEOUT)
check(DATA1)

# Update task data.
# Update task data, which implicitly restarts the timer.
task.update(DATA2)
acc.clear()
event.clear()
event.wait(PERIOD*2)
task.stop()

data = [v[0] for v in acc]
self.assertEqual(data, [DATA1, DATA2])
ts = [v[1] for v in acc]
self.assertAlmostEqual(ts[1]-ts[0], PERIOD, places=1)
# Wait for two frames to arrive; then check the result.
event.wait(TIMEOUT)
check(DATA2)

# Stop the task.
task.stop()
self.assertIsNone(self.network.bus.recv(TIMEOUT))


class TestScanner(unittest.TestCase):
Expand Down

0 comments on commit 3dacf57

Please sign in to comment.