From 13a16cf87f1be857cb93aa2d3f82affcb350416b Mon Sep 17 00:00:00 2001 From: Xichen96 Date: Thu, 21 Dec 2023 01:09:46 +0800 Subject: [PATCH] [dhcp_server] add config dhcp_server add (#17489) * dhcp_server add * add test dup gw nm --- .../cli-plugin-tests/conftest.py | 16 +++ .../cli-plugin-tests/mock_config_db.json | 10 ++ .../test_config_dhcp_server.py | 115 ++++++++++++++++++ .../cli/config/plugins/dhcp_server.py | 94 ++++++++++++++ 4 files changed, 235 insertions(+) create mode 100644 dockers/docker-dhcp-server/cli-plugin-tests/test_config_dhcp_server.py diff --git a/dockers/docker-dhcp-server/cli-plugin-tests/conftest.py b/dockers/docker-dhcp-server/cli-plugin-tests/conftest.py index a51f76ac1755..4dc078f76904 100644 --- a/dockers/docker-dhcp-server/cli-plugin-tests/conftest.py +++ b/dockers/docker-dhcp-server/cli-plugin-tests/conftest.py @@ -48,8 +48,24 @@ def get(table, key, entry): if table == "STATE_DB": return mock_state_db.get(key, {}).get(entry, None) + def hmset(table, key, value): + assert table == "CONFIG_DB" or table == "STATE_DB" + if table == "CONFIG_DB": + mock_config_db[key] = value + if table == "STATE_DB": + mock_state_db[key] = value + + def exists(table, key): + assert table == "CONFIG_DB" or table == "STATE_DB" + if table == "CONFIG_DB": + return key in mock_config_db + if table == "STATE_DB": + return key in mock_state_db + db.keys = mock.Mock(side_effect=keys) db.get_all = mock.Mock(side_effect=get_all) db.get = mock.Mock(side_effect=get) + db.hmset = mock.Mock(side_effect=hmset) + db.exists = mock.Mock(side_effect=exists) yield db diff --git a/dockers/docker-dhcp-server/cli-plugin-tests/mock_config_db.json b/dockers/docker-dhcp-server/cli-plugin-tests/mock_config_db.json index 908c1143544e..751f716f66c2 100644 --- a/dockers/docker-dhcp-server/cli-plugin-tests/mock_config_db.json +++ b/dockers/docker-dhcp-server/cli-plugin-tests/mock_config_db.json @@ -2,6 +2,16 @@ "FEATURE|dhcp_server": { "state": "enabled" }, + "VLAN_INTERFACE|Vlan100": { + }, + "VLAN_INTERFACE|Vlan100|100.1.1.1/24": { + }, + "VLAN_INTERFACE|Vlan200": { + }, + "VLAN_INTERFACE|Vlan200|100.1.1.2/24": { + }, + "VLAN_INTERFACE|Vlan300": { + }, "DHCP_SERVER_IPV4|Vlan100": { "gateway": "100.1.1.1", "lease_time": "3600", diff --git a/dockers/docker-dhcp-server/cli-plugin-tests/test_config_dhcp_server.py b/dockers/docker-dhcp-server/cli-plugin-tests/test_config_dhcp_server.py new file mode 100644 index 000000000000..bc71154ccf51 --- /dev/null +++ b/dockers/docker-dhcp-server/cli-plugin-tests/test_config_dhcp_server.py @@ -0,0 +1,115 @@ +import sys +from unittest import mock +import pytest + +from click.testing import CliRunner + +import utilities_common.cli as clicommon + +sys.path.append('../cli/config/plugins/') +import dhcp_server + + +class TestConfigDHCPServer(object): + def test_plugin_registration(self): + cli = mock.MagicMock() + dhcp_server.register(cli) + + str_type = [[12, "whatever", False], + ["text", "whatever", False], + ["string", "whatever", True], + ["binary", "12abc", False], + ["binary", "123abc45", True], + ["boolean", "True", False], + ["boolean", "true", True], + ["ipv4-address", "10.10.1", False], + ["ipv4-address", "10.10.1.0", True], + ["uint8", "4500", False], + ["uint8", "-45", False], + ["uint8", "45", True], + ] + + @pytest.mark.parametrize("type, value, result", str_type) + def test_validate_str_type(self, type, value, result): + assert dhcp_server.validate_str_type(type, value) == result + + def test_config_dhcp_server_ipv4_add(self, mock_db): + expected_value = { + "gateway": "10.10.10.10", + "lease_time": "1000", + "mode": "PORT", + "netmask": "255.255.254.0", + "state": "disabled" + } + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \ + ["Vlan200", "--mode=PORT", "--lease_time=1000", "--gateway=10.10.10.10", "--netmask=255.255.254.0"], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4|Vlan200") == expected_value + + def test_config_dhcp_server_ipv4_add_dup_gw_nm(self, mock_db): + expected_value = { + "gateway": "100.1.1.2", + "lease_time": "1000", + "mode": "PORT", + "netmask": "255.255.255.0", + "state": "disabled" + } + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \ + ["Vlan200", "--mode=PORT", "--lease_time=1000", "--dup_gw_nm"], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4|Vlan200") == expected_value + + def test_config_dhcp_server_ipv4_add_illegal_mode(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \ + ["Vlan200", "--mode=WHATEVER", "--lease_time=1000", "--gateway=10.10.10.10", "--netmask=255.255.254.0"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_add_illegal_lease_time(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \ + ["Vlan200", "--mode=PORT", "--lease_time=-1000", "--gateway=10.10.10.10", "--netmask=255.255.254.0"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_add_no_vlan(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \ + ["Vlan400", "--mode=PORT", "--lease_time=1000", "--gateway=10.10.10.10", "--netmask=255.255.254.0"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_add_no_vlan_ip(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \ + ["Vlan300", "--mode=PORT", "--lease_time=1000", "--dup_gw_nm"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_add_illegal_ip(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \ + ["Vlan200", "--mode=PORT", "--lease_time=1000", "--gateway=10000.10.10.10", "--netmask=255.255.254.0"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_add_already_exist(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \ + ["Vlan100", "--mode=PORT", "--lease_time=1000", "--gateway=10.10.10.10", "--netmask=255.255.254.0"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + diff --git a/dockers/docker-dhcp-server/cli/config/plugins/dhcp_server.py b/dockers/docker-dhcp-server/cli/config/plugins/dhcp_server.py index 539a71e6a40b..faf920b995c3 100644 --- a/dockers/docker-dhcp-server/cli/config/plugins/dhcp_server.py +++ b/dockers/docker-dhcp-server/cli/config/plugins/dhcp_server.py @@ -1,13 +1,107 @@ import click import utilities_common.cli as clicommon +import ipaddress +import string + + +SUPPORT_TYPE = ["binary", "boolean", "ipv4-address", "string", "uint8", "uint16", "uint32"] + + +def validate_str_type(type, value): + """ + To validate whether type is consistent with string value + Args: + type: string, value type + value: checked value + Returns: + True, type consistent with value + False, type not consistent with value + """ + if not isinstance(value, str): + return False + if type not in SUPPORT_TYPE: + return False + if type == "string": + return True + if type == "binary": + if len(value) == 0 or len(value) % 2 != 0: + return False + return all(c in set(string.hexdigits) for c in value) + if type == "boolean": + return value in ["true", "false"] + if type == "ipv4-address": + try: + if len(value.split(".")) != 4: + return False + return ipaddress.ip_address(value).version == 4 + except ValueError: + return False + if type.startswith("uint"): + if not value.isdigit(): + return False + length = int("".join([c for c in type if c.isdigit()])) + return 0 <= int(value) <= int(pow(2, length)) - 1 + return False + @click.group(cls=clicommon.AbbreviationGroup, name="dhcp_server") def dhcp_server(): """config DHCP Server information""" + ctx = click.get_current_context() + dbconn = db.db + if dbconn.get("CONFIG_DB", "FEATURE|dhcp_server", "state") != "enabled": + ctx.fail("Feature dhcp_server is not enabled") + + +@dhcp_server.group(cls=clicommon.AliasedGroup, name="ipv4") +def dhcp_server_ipv4(): + """Show ipv4 related dhcp_server info""" pass +@dhcp_server_ipv4.command(name="add") +@click.argument("dhcp_interface", required=True) +@click.option("--mode", required=True) +@click.option("--lease_time", required=False, default="900") +@click.option("--dup_gw_nm", required=False, default=False, is_flag=True) +@click.option("--gateway", required=False) +@click.option("--netmask", required=False) +@clicommon.pass_db +def dhcp_server_ipv4_add(db, mode, lease_time, dup_gw_nm, gateway, netmask, dhcp_interface): + ctx = click.get_current_context() + if mode != "PORT": + ctx.fail("Only mode PORT is supported") + if not validate_str_type("uint32", lease_time): + ctx.fail("lease_time is required and must be nonnegative integer") + dbconn = db.db + if not dbconn.exists("CONFIG_DB", "VLAN_INTERFACE|" + dhcp_interface): + ctx.fail("dhcp_interface {} does not exist".format(dhcp_interface)) + if dup_gw_nm: + dup_success = False + for key in dbconn.keys("CONFIG_DB", "VLAN_INTERFACE|" + dhcp_interface + "|*"): + intf = ipaddress.ip_interface(key.split("|")[2]) + if intf.version != 4: + continue + dup_success = True + gateway, netmask = str(intf.ip), str(intf.netmask) + if not dup_success: + ctx.fail("failed to found gateway and netmask for Vlan interface {}".format(dhcp_interface)) + elif not validate_str_type("ipv4-address", gateway) or not validate_str_type("ipv4-address", netmask): + ctx.fail("gateway and netmask must be valid ipv4 string") + key = "DHCP_SERVER_IPV4|" + dhcp_interface + if dbconn.exists("CONFIG_DB", key): + ctx.fail("Dhcp_interface %s already exist".format(dhcp_interface)) + else: + dbconn.hmset("CONFIG_DB", key, { + "mode": mode, + "lease_time": lease_time, + "gateway": gateway, + "netmask": netmask, + "state": "disabled", + }) + + def register(cli): # cli.add_command(dhcp_server) pass