Skip to content

Commit

Permalink
Add router restart test
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Dec 13, 2024
1 parent fe36d04 commit a344c90
Showing 1 changed file with 115 additions and 42 deletions.
157 changes: 115 additions & 42 deletions tests/connection_restore.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,46 @@
import subprocess
import time
import sys
import os
import threading

CONNECT_TIMEOUT_S = 15
ROUTER_INIT_TIMEOUT_S = 3
WAIT_MESSAGE_TIMEOUT_S = 15
DISCONNECT_MESSAGE = "Closing session because it has expired"
CONNECT_MESSAGE = "Z_OPEN(Ack)"
ZENOH_PORT = "7447"

ROUTER_ARGS = ['-l', f'tcp/0.0.0.0:{ZENOH_PORT}', '--no-multicast-scouting']

DIR_EXAMPLES = "build/examples"
TEST_COMMANDS = [
# Active client
["stdbuf", "-o0", f'{DIR_EXAMPLES}/z_pub', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}'],
# Passive client
["stdbuf", "-o0", f'{DIR_EXAMPLES}/z_sub', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}'],
]
ACTIVE_CLIENT_COMMAND = ["stdbuf", "-o0", f'{DIR_EXAMPLES}/z_pub', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}']
PASSIVE_CLIENT_COMMAND = ["stdbuf", "-o0", f'{DIR_EXAMPLES}/z_sub', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}']


def run_process(command, output_collector, process_list):
print(f"Run {command}")
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
process_list.append(process)
for line in iter(process.stdout.readline, ''):
print("--", line.strip())
output_collector.append(line.strip())
if output_collector is not None:
output_collector.append(line.strip())
process.stdout.close()
process.wait()


def run_background(command, output_collector, process_list):
thread = threading.Thread(target=run_process, args=(command, output_collector, process_list))
thread.start()


def terminate_processes(process_list):
for process in process_list:
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process_list.clear()


def block_connection():
Expand All @@ -49,22 +55,74 @@ def unblock_connection():

def wait_message(client_output, message):
start_time = time.time()
while time.time() - start_time < CONNECT_TIMEOUT_S:
if any(CONNECT_MESSAGE in line for line in client_output):
while time.time() - start_time < WAIT_MESSAGE_TIMEOUT_S:
if any(message in line for line in client_output):
return True
time.sleep(1)
else:
return False
return False


def test_connection(client_command, timeout, wait_disconnect):
print(f"Test {client_command} for timeout {timeout}")
def test_connection(router_command, client_command, timeout, wait_disconnect):
print(f"Drop test {client_command} for timeout {timeout}")
client_output = []
process_list = []
blocked = False
try:
client_thread = threading.Thread(target=run_process, args=(client_command, client_output, process_list))
client_thread.start()
run_background(router_command, None, process_list)
time.sleep(ROUTER_INIT_TIMEOUT_S)

run_background(client_command, client_output, process_list)

# Two iterations because there was an error on the second reconnection
for _ in range(2):
if wait_message(client_output, CONNECT_MESSAGE):
print("Initial connection successful.")
else:
print("Connection failed.")
return False

client_output.clear()
print("Blocking connection...")
block_connection()
blocked = True
time.sleep(timeout)

if wait_disconnect:
if wait_message(client_output, DISCONNECT_MESSAGE):
print("Connection lost successfully.")
else:
print("Failed to block connection.")
return False

client_output.clear()
print("Unblocking connection...")
unblock_connection()
blocked = False

if wait_message(client_output, CONNECT_MESSAGE):
print("Connection restored successfully.")
else:
print("Failed to restore connection.")
return False

print(f"Drop test {client_command} for timeout {timeout} passed")
return True
finally:
if blocked:
unblock_connection()
terminate_processes(process_list)


def test_restart(router_command, client_command, timeout):
print(f"Restart test {client_command} for timeout {timeout}")
client_output = []
router_process_list = []
client_process_list = []
try:
run_background(router_command, None, router_process_list)
time.sleep(ROUTER_INIT_TIMEOUT_S)

run_background(client_command, client_output, client_process_list)

if wait_message(client_output, CONNECT_MESSAGE):
print("Initial connection successful.")
Expand All @@ -73,22 +131,12 @@ def test_connection(client_command, timeout, wait_disconnect):
return False

client_output.clear()
print("Blocking connection...")
block_connection()
blocked = True
print("Stop router...")
terminate_processes(router_process_list)
time.sleep(timeout)

if wait_disconnect:
if wait_message(client_output, DISCONNECT_MESSAGE):
print("Connection lost successfully.")
else:
print("Failed to block connection.")
return False

client_output.clear()
print("Unblocking connection...")
unblock_connection()
blocked = False
print("Start router...")
run_background(router_command, None, router_process_list)
time.sleep(timeout)

if wait_message(client_output, CONNECT_MESSAGE):
Expand All @@ -97,19 +145,44 @@ def test_connection(client_command, timeout, wait_disconnect):
print("Failed to restore connection.")
return False

print(f"Test for timeout {timeout} passed.")
print(f"Restart test {client_command} for timeout {timeout} passed")
return True
finally:
if blocked:
unblock_connection()
terminate_processes(process_list)
terminate_processes(client_process_list + router_process_list)


def test_connction_drop(router_command):
# timeout more than sesson timeout
# if not test_connection(router_command, ACTIVE_CLIENT_COMMAND, 8, False):
# sys.exit(1)
# timeout less than sesson timeout
if not test_connection(router_command, ACTIVE_CLIENT_COMMAND, 15, True):
sys.exit(1)
# timeout more than sesson timeout
# if not test_connection(router_command, PASSIVE_CLIENT_COMMAND, 8, False):
# sys.exit(1)
# timeout less than sesson timeout
if not test_connection(router_command, PASSIVE_CLIENT_COMMAND, 15, True):
sys.exit(1)


def test_router_restart(router_command):
if not test_restart(router_command, ACTIVE_CLIENT_COMMAND, 15):
sys.exit(1)
if not test_restart(router_command, PASSIVE_CLIENT_COMMAND, 15):
sys.exit(1)


def main():
if len(sys.argv) != 2:
print("Usage: sudo python3 ./connection_restore.py /path/to/zenohd")
sys.exit(1)

router_command = [sys.argv[1]] + ROUTER_ARGS

test_connction_drop(router_command)
test_router_restart(router_command)


if __name__ == "__main__":
for cmd in TEST_COMMANDS:
# timeout more than sesson timeout
if not test_connection(cmd, 8, False):
sys.exit(1)
# timeout less than sesson timeout
if not test_connection(cmd, 15, False):
sys.exit(1)
main()

0 comments on commit a344c90

Please sign in to comment.