diff --git a/irrd/updates/handler.py b/irrd/updates/handler.py index 11028bc9..2feab4c8 100644 --- a/irrd/updates/handler.py +++ b/irrd/updates/handler.py @@ -6,6 +6,7 @@ from ordered_set import OrderedSet from irrd.conf import get_setting +from irrd.rpsl.rpsl_objects import RPSLMntner from irrd.storage.database_handler import DatabaseHandler from irrd.storage.queries import RPSLDatabaseQuery from irrd.utils import email @@ -80,8 +81,8 @@ def load_change_submission(self, data: RPSLChangeSubmission, delete=False, reque return self def _handle_change_requests(self, change_requests: List[ChangeRequest], - reference_validator: ReferenceValidator, - auth_validator: AuthValidator) -> None: + reference_validator: ReferenceValidator, + auth_validator: AuthValidator) -> None: # When an object references another object, e.g. tech-c referring a person or mntner, # an add/update is only valid if those referred objects exist. To complicate matters, @@ -101,7 +102,12 @@ def _handle_change_requests(self, change_requests: List[ChangeRequest], while valid_changes != previous_valid_changes: previous_valid_changes = valid_changes reference_validator.preload(valid_changes) - auth_validator.pre_approve(valid_changes) + valid_potential_new_mntners = [ + r.rpsl_obj_new + for r in valid_changes + if r.request_type == UpdateRequestType.CREATE and isinstance(r.rpsl_obj_new, RPSLMntner) + ] + auth_validator.pre_approve(valid_potential_new_mntners) for result in valid_changes: result.validate() @@ -261,4 +267,3 @@ def _request_meta_str(self): if request_meta_str: request_meta_str = '\n' + request_meta_str + '\n\n' return request_meta_str - diff --git a/irrd/updates/tests/test_parser.py b/irrd/updates/tests/test_parser.py index 44aeed87..430a0785 100644 --- a/irrd/updates/tests/test_parser.py +++ b/irrd/updates/tests/test_parser.py @@ -416,7 +416,7 @@ def test_check_auth_valid_create_mntner_referencing_self(self, prepare_mocks): result_mntner = parse_change_requests(SAMPLE_MNTNER + 'override: override-password', mock_dh, auth_validator, reference_validator)[0] - auth_validator.pre_approve([result_mntner]) + auth_validator.pre_approve([result_mntner.rpsl_obj_new]) assert result_mntner._check_auth() assert not result_mntner.error_messages @@ -437,7 +437,7 @@ def test_check_auth_invalid_create_mntner_referencing_self_wrong_override_passwo result_mntner = parse_change_requests(SAMPLE_MNTNER + 'override: invalid-password', mock_dh, auth_validator, reference_validator)[0] - auth_validator.pre_approve([result_mntner]) + auth_validator.pre_approve([result_mntner.rpsl_obj_new]) assert not result_mntner._check_auth() assert result_mntner.error_messages == [ @@ -467,7 +467,7 @@ def test_check_auth_valid_update_mntner_submits_new_object_with_all_dummy_hash_v data = data.replace('$1$fgW84Y9r$kKEn9MUq8PChNKpQhO6BM.', PASSWORD_HASH_DUMMY_VALUE) result_mntner = parse_change_requests(data + 'password: crypt-password', mock_dh, auth_validator, reference_validator)[0] - auth_validator.pre_approve([result_mntner]) + auth_validator.pre_approve([result_mntner.rpsl_obj_new]) assert result_mntner._check_auth() assert not result_mntner.error_messages assert result_mntner.info_messages == ['As you submitted dummy hash values, all password hashes on this object ' @@ -504,7 +504,7 @@ def test_check_auth_invalid_update_mntner_submits_new_object_with_mixed_dummy_ha data = SAMPLE_MNTNER.replace('LEuuhsBJNFV0Q', PASSWORD_HASH_DUMMY_VALUE) result_mntner = parse_change_requests(data + 'password: md5-password', mock_dh, auth_validator, reference_validator)[0] - auth_validator.pre_approve([result_mntner]) + auth_validator.pre_approve([result_mntner.rpsl_obj_new]) assert not result_mntner.is_valid() assert result_mntner.error_messages == [ 'Either all password auth hashes in a submitted mntner must be dummy objects, or none.', @@ -524,7 +524,7 @@ def test_check_auth_invalid_update_mntner_submits_new_object_with_dummy_hash_mul data = data.replace('$1$fgW84Y9r$kKEn9MUq8PChNKpQhO6BM.', PASSWORD_HASH_DUMMY_VALUE) result_mntner = parse_change_requests(data + 'password: md5-password\npassword: other-password', mock_dh, auth_validator, reference_validator)[0] - auth_validator.pre_approve([result_mntner]) + auth_validator.pre_approve([result_mntner.rpsl_obj_new]) result_mntner._check_auth() assert not result_mntner.is_valid() assert result_mntner.error_messages == [ @@ -544,7 +544,6 @@ def test_check_auth_invalid_update_mntner_wrong_password_current_db_object(self, # This password is valid for the new object, but invalid for the current version in the DB result_mntner = parse_change_requests(SAMPLE_MNTNER + 'password: crypt-password', mock_dh, auth_validator, reference_validator)[0] - auth_validator.pre_approve([result_mntner]) assert not result_mntner._check_auth() assert result_mntner.error_messages == [ 'Authorisation for mntner TEST-MNT failed: must be authenticated by one of: TEST-MNT, ' diff --git a/irrd/updates/tests/test_validators.py b/irrd/updates/tests/test_validators.py new file mode 100644 index 00000000..c47bbb84 --- /dev/null +++ b/irrd/updates/tests/test_validators.py @@ -0,0 +1,435 @@ +# flake8: noqa: W293 +import itertools +from unittest.mock import Mock + +import pytest +from pytest import raises + +from irrd.rpsl.rpsl_objects import rpsl_object_from_text +from irrd.utils.rpsl_samples import (SAMPLE_MNTNER, SAMPLE_MNTNER_CRYPT, + SAMPLE_MNTNER_MD5, SAMPLE_PERSON, + SAMPLE_ROUTE, SAMPLE_ROUTE6) +from irrd.utils.test_utils import flatten_mock_calls +from irrd.utils.text import remove_auth_hashes + +from ..validators import AuthValidator + +VALID_PW = 'override-password' +INVALID_PW = 'not-override-password' +VALID_PW_HASH = '$1$J6KycItM$MbPaBU6iFSGFV299Rk7Di0' + + +@pytest.fixture() +def prepare_mocks(monkeypatch): + mock_dh = Mock() + mock_dq = Mock() + monkeypatch.setattr('irrd.updates.parser.RPSLDatabaseQuery', lambda: mock_dq) + monkeypatch.setattr('irrd.updates.validators.RPSLDatabaseQuery', lambda: mock_dq) + + validator = AuthValidator(mock_dh, None) + yield validator, mock_dq, mock_dh + + +class TestAuthValidatorOverride: + def test_valid_override(self, prepare_mocks, config_override): + config_override({ + 'auth': {'override_password': VALID_PW_HASH}, + }) + validator, mock_dq, mock_dh = prepare_mocks + person = rpsl_object_from_text(SAMPLE_PERSON) + + validator.overrides = [VALID_PW] + result = validator.process_auth(person, None) + assert result.is_valid(), result.error_messages + assert result.used_override + + person = rpsl_object_from_text(SAMPLE_PERSON) + result = validator.process_auth(person, person) + assert result.is_valid(), result.error_messages + assert result.used_override + + def test_invalid_or_missing_override(self, prepare_mocks, config_override): + # This test mostly ignores the regular process that happens + # after override validation fails. + validator, mock_dq, mock_dh = prepare_mocks + mock_dh.execute_query = lambda q: [] + person = rpsl_object_from_text(SAMPLE_PERSON) + + validator.overrides = [VALID_PW] + result = validator.process_auth(person, None) + assert not result.is_valid() + assert not result.used_override + + config_override({ + 'auth': {'override_password': VALID_PW_HASH}, + }) + validator.overrides = [] + result = validator.process_auth(person, None) + assert not result.is_valid() + assert not result.used_override + + validator.overrides = [INVALID_PW] + result = validator.process_auth(person, None) + assert not result.is_valid() + assert not result.used_override + + config_override({ + 'auth': {'override_password': 'not-valid-hash'}, + }) + person = rpsl_object_from_text(SAMPLE_PERSON) + result = validator.process_auth(person, None) + assert not result.is_valid() + assert not result.used_override + + +class TestAuthValidator: + def test_valid_new_person(self, prepare_mocks): + validator, mock_dq, mock_dh = prepare_mocks + person = rpsl_object_from_text(SAMPLE_PERSON) + mock_dh.execute_query = lambda q: [ + {'object_class': 'mntner', 'object_text': SAMPLE_MNTNER}, + ] + + validator.passwords = [SAMPLE_MNTNER_MD5] + result = validator.process_auth(person, None) + assert result.is_valid(), result.error_messages + assert not result.used_override + assert len(result.mntners_notify) == 1 + assert result.mntners_notify[0].pk() == 'TEST-MNT' + + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT'},), {}], + ] + + def test_existing_person_mntner_change(self, prepare_mocks): + validator, mock_dq, mock_dh = prepare_mocks + # TEST-MNT is in both maintainers + person_new = rpsl_object_from_text(SAMPLE_PERSON + 'mnt-by: TEST-NEW-MNT\n') + person_old = rpsl_object_from_text(SAMPLE_PERSON + 'mnt-by: TEST-OLD-MNT\n') + query_results = itertools.cycle([ + [ + { + 'object_class': 'mntner', + 'object_text': SAMPLE_MNTNER.replace('TEST-MNT', 'TEST-NEW-MNT').replace('MD5', 'nomd5') + }, { + 'object_class': 'mntner', + 'object_text': SAMPLE_MNTNER.replace('MD5', 'nomd5').replace('CRYPT', 'nocrypt') + }, + ], + [ + { + 'object_class': 'mntner', + 'object_text': SAMPLE_MNTNER.replace('TEST-MNT', 'TEST-OLD-MNT').replace('CRYPT', 'nocrypt') + }, + ], + ]) + mock_dh.execute_query = lambda q: next(query_results) + + validator.passwords = [SAMPLE_MNTNER_CRYPT, SAMPLE_MNTNER_MD5] + result = validator.process_auth(person_new, person_old) + + assert result.is_valid(), result.error_messages + assert not result.used_override + assert {m.pk() for m in result.mntners_notify} == {'TEST-MNT', 'TEST-OLD-MNT'} + + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT', 'TEST-NEW-MNT'},), {}], + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-OLD-MNT'},), {}], # TEST-MNT is cached + ] + + validator.passwords = [SAMPLE_MNTNER_MD5] + result = validator.process_auth(person_new, person_old) + assert not result.is_valid() + print(result.error_messages) + assert result.error_messages == {'Authorisation for person PERSON-TEST failed: ' + 'must be authenticated by one of: TEST-MNT, TEST-NEW-MNT'} + + validator.passwords = [SAMPLE_MNTNER_CRYPT] + result = validator.process_auth(person_new, person_old) + assert not result.is_valid() + assert result.error_messages == {'Authorisation for person PERSON-TEST failed: ' + 'must be authenticated by one of: TEST-MNT, TEST-OLD-MNT'} + + def test_valid_new_person_preapproved_mntner(self, prepare_mocks): + validator, mock_dq, mock_dh = prepare_mocks + person = rpsl_object_from_text(SAMPLE_PERSON) + mock_dh.execute_query = lambda q: [ + {'object_class': 'mntner', 'object_text': SAMPLE_MNTNER}, + ] + validator.pre_approve([rpsl_object_from_text(SAMPLE_MNTNER)]) + + result = validator.process_auth(person, None) + assert result.is_valid(), result.error_messages + assert not result.used_override + assert len(result.mntners_notify) == 1 + assert result.mntners_notify[0].pk() == 'TEST-MNT' + + def test_create_mntner_requires_override(self, prepare_mocks, config_override): + validator, mock_dq, mock_dh = prepare_mocks + mntner = rpsl_object_from_text(SAMPLE_MNTNER) + mock_dh.execute_query = lambda q: [ + {'object_class': 'mntner', 'object_text': SAMPLE_MNTNER}, + ] + + validator.passwords = [SAMPLE_MNTNER_MD5] + result = validator.process_auth(mntner, None) + assert not result.is_valid() + assert not result.used_override + assert result.error_messages == {'New mntner objects must be added by an administrator.'} + + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT', 'OTHER1-MNT', 'OTHER2-MNT'},), {}], + ] + + validator.overrides = [VALID_PW] + config_override({ + 'auth': {'override_password': VALID_PW_HASH}, + }) + + result = validator.process_auth(mntner, None) + assert result.is_valid(), result.error_messages + assert result.used_override + + def test_modify_mntner(self, prepare_mocks, config_override): + validator, mock_dq, mock_dh = prepare_mocks + mntner = rpsl_object_from_text(SAMPLE_MNTNER) + mock_dh.execute_query = lambda q: [ + {'object_class': 'mntner', 'object_text': SAMPLE_MNTNER}, + ] + + # This counts as submitting all new hashes. + validator.passwords = [SAMPLE_MNTNER_MD5] + result = validator.process_auth(mntner, mntner) + assert result.is_valid() + assert not result.info_messages + + # This counts as submitting all new hashes, but not matching any password + new_mntner = rpsl_object_from_text(SAMPLE_MNTNER.replace('CRYPT', '').replace('MD5', '')) + validator.passwords = [SAMPLE_MNTNER_MD5] + result = validator.process_auth(new_mntner, mntner) + assert not result.is_valid() + assert result.error_messages == { + 'Authorisation failed for the auth methods on this mntner object.' + } + + # This counts as submitting all dummy hashes. + mntner_no_auth_hashes = remove_auth_hashes(SAMPLE_MNTNER) + new_mntner = rpsl_object_from_text(mntner_no_auth_hashes) + result = validator.process_auth(new_mntner, mntner) + assert result.is_valid() + assert not new_mntner.has_dummy_auth_value() + assert result.info_messages == { + 'As you submitted dummy hash values, all password hashes on this ' + 'object were replaced with a new MD5-PW hash of the password you ' + 'provided for authentication.' + } + + # # This is a multi password submission with dummy hashes which is rejected + validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT] + new_mntner = rpsl_object_from_text(mntner_no_auth_hashes) + result = validator.process_auth(new_mntner, mntner) + assert not result.is_valid() + assert not result.info_messages + assert result.error_messages == { + 'Object submitted with dummy hash values, but multiple or no passwords ' + 'submitted. Either submit only full hashes, or a single password.' + } + + +class TestAuthValidatorRelatedObjects: + def test_related_route_exact_inetnum(self, prepare_mocks, config_override): + validator, mock_dq, mock_dh = prepare_mocks + route = rpsl_object_from_text(SAMPLE_ROUTE) + query_results = itertools.cycle([ + [{'object_text': SAMPLE_MNTNER.replace('MD5', '')}], # mntner for object + [{ + # attempt to look for exact inetnum + 'object_class': 'inetnum', + 'rpsl_pk': '192.0.2.0-192.0.2.255', + 'parsed_data': {'mnt-by': ['RELATED-MNT']} + }], + [{'object_text': SAMPLE_MNTNER.replace('CRYPT', '')}], # related mntner retrieval + ]) + mock_dh.execute_query = lambda q: next(query_results) + + validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT] + result = validator.process_auth(route, None) + assert result.is_valid() + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT'},), {}], + ['sources', (['TEST'],), {}], + + ['object_classes', (['inetnum'],), {}], + ['first_only', (), {}], + ['ip_exact', ('192.0.2.0/24',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'RELATED-MNT'},), {}] + ] + + validator = AuthValidator(mock_dh, None) + validator.passwords = [SAMPLE_MNTNER_CRYPT] # related only has MD5, so this is invalid + result = validator.process_auth(route, None) + assert not result.is_valid() + assert result.error_messages == { + 'Authorisation for route 192.0.2.0/24AS65537 failed: must be authenticated by one of: ' + 'RELATED-MNT - from parent inetnum 192.0.2.0-192.0.2.255' + } + + config_override({'auth': {'authenticate_related_mntners': False}}) + result = validator.process_auth(route, None) + assert result.is_valid() + config_override({}) + + result = validator.process_auth(route, route) + assert result.is_valid() + + def test_related_route_less_specific_inetnum(self, prepare_mocks, config_override): + validator, mock_dq, mock_dh = prepare_mocks + route = rpsl_object_from_text(SAMPLE_ROUTE) + query_results = itertools.cycle([ + [{'object_text': SAMPLE_MNTNER.replace('MD5', '')}], # mntner for object + [], # attempt to look for exact inetnum + [{ + # attempt to look for one level less specific inetnum + 'object_class': 'inetnum', + 'rpsl_pk': '192.0.2.0-192.0.2.255', + 'parsed_data': {'mnt-by': ['RELATED-MNT']} + }], + [{'object_text': SAMPLE_MNTNER.replace('CRYPT', '')}], # related mntner retrieval + ]) + mock_dh.execute_query = lambda q: next(query_results) + + validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT] + result = validator.process_auth(route, None) + assert result.is_valid() + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT'},), {}], + ['sources', (['TEST'],), {}], + + ['object_classes', (['inetnum'],), {}], + ['first_only', (), {}], + ['ip_exact', ('192.0.2.0/24',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['inetnum'],), {}], + ['first_only', (), {}], + ['ip_less_specific_one_level', ('192.0.2.0/24',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'RELATED-MNT'},), {}] + ] + + validator = AuthValidator(mock_dh, None) + validator.passwords = [SAMPLE_MNTNER_CRYPT] # related only has MD5, so this is invalid + result = validator.process_auth(route, None) + assert not result.is_valid() + assert result.error_messages == { + 'Authorisation for route 192.0.2.0/24AS65537 failed: must be authenticated by one of: ' + 'RELATED-MNT - from parent inetnum 192.0.2.0-192.0.2.255' + } + + def test_related_route_less_specific_route(self, prepare_mocks, config_override): + validator, mock_dq, mock_dh = prepare_mocks + route = rpsl_object_from_text(SAMPLE_ROUTE) + query_results = itertools.cycle([ + [{'object_text': SAMPLE_MNTNER.replace('MD5', '')}], # mntner for object + [], # attempt to look for exact inetnum + [], # attempt to look for one level less specific inetnum + [{ + # attempt to look for less specific route + 'object_class': 'route', + 'rpsl_pk': '192.0.2.0/24AS65537', + 'parsed_data': {'mnt-by': ['RELATED-MNT']} + }], + [{'object_text': SAMPLE_MNTNER.replace('CRYPT', '')}], # related mntner retrieval + ]) + mock_dh.execute_query = lambda q: next(query_results) + + validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT] + result = validator.process_auth(route, None) + assert result.is_valid() + + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT'},), {}], + ['sources', (['TEST'],), {}], + + ['object_classes', (['inetnum'],), {}], + ['first_only', (), {}], + ['ip_exact', ('192.0.2.0/24',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['inetnum'],), {}], + ['first_only', (), {}], + ['ip_less_specific_one_level', ('192.0.2.0/24',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['route'],), {}], + ['first_only', (), {}], + ['ip_less_specific_one_level', ('192.0.2.0/24',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'RELATED-MNT'},), {}] + ] + + validator = AuthValidator(mock_dh, None) + validator.passwords = [SAMPLE_MNTNER_CRYPT] # related only has MD5, so this is invalid + result = validator.process_auth(route, None) + assert not result.is_valid() + assert result.error_messages == { + 'Authorisation for route 192.0.2.0/24AS65537 failed: must be authenticated by one of: ' + 'RELATED-MNT - from parent route 192.0.2.0/24AS65537' + } + + def test_related_route_no_match_v6(self, prepare_mocks, config_override): + validator, mock_dq, mock_dh = prepare_mocks + route = rpsl_object_from_text(SAMPLE_ROUTE6) + query_results = itertools.cycle([ + [{'object_text': SAMPLE_MNTNER}], # mntner for object + [], # attempt to look for exact inetnum + [], # attempt to look for one level less specific inetnum + [], # attempt to look for less specific route + ]) + mock_dh.execute_query = lambda q: next(query_results) + + validator.passwords = [SAMPLE_MNTNER_MD5] + result = validator.process_auth(route, None) + assert result.is_valid() + + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT'},), {}], + ['sources', (['TEST'],), {}], + + ['object_classes', (['inet6num'],), {}], + ['first_only', (), {}], + ['ip_exact', ('2001:db8::/48',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['inet6num'],), {}], + ['first_only', (), {}], + ['ip_less_specific_one_level', ('2001:db8::/48',), {}], + + ['sources', (['TEST'],), {}], + ['object_classes', (['route6'],), {}], + ['first_only', (), {}], + ['ip_less_specific_one_level', ('2001:db8::/48',), {}], + ] diff --git a/irrd/updates/validators.py b/irrd/updates/validators.py index 8922754c..c159f739 100644 --- a/irrd/updates/validators.py +++ b/irrd/updates/validators.py @@ -146,11 +146,13 @@ class AuthValidator: def __init__(self, database_handler: DatabaseHandler, keycert_obj_pk=None) -> None: self.database_handler = database_handler + self.passwords = [] + self.overrides = [] self._mntner_db_cache: Set[RPSLMntner] = set() self._pre_approved: Set[str] = set() self.keycert_obj_pk = keycert_obj_pk - def pre_approve(self, results: List['ChangeRequest']) -> None: + def pre_approve(self, presumed_valid_new_mntners: List[RPSLMntner]) -> None: """ Pre-approve certain maintainers that are part of this batch of updates. This is required for creating new maintainers along with other objects. @@ -161,10 +163,7 @@ def pre_approve(self, results: List['ChangeRequest']) -> None: When the new mntner object's mnt-by is checked, there is an additional check to verify that it passes the newly submitted authentication. """ - self._pre_approved = set() - for request in results: - if request.is_valid() and request.request_type == UpdateRequestType.CREATE and isinstance(request.rpsl_obj_new, RPSLMntner): - self._pre_approved.add(request.rpsl_obj_new.pk()) + self._pre_approved = {obj.pk() for obj in presumed_valid_new_mntners} def process_auth(self, rpsl_obj_new: RPSLObject, rpsl_obj_current: Optional[RPSLObject]) -> ValidatorResult: """ @@ -252,7 +251,7 @@ def _check_mntners(self, mntner_pk_list: List[str], source: str) -> Tuple[bool, Returns True if at least one of the mntners in mntner_list passes authentication, given self.passwords and self.keycert_obj_pk. Updates and checks self._mntner_db_cache - to prevent double checking of maintainers. + to prevent double retrieval of maintainers. """ mntner_pk_set = set(mntner_pk_list) mntner_objs: List[RPSLMntner] = [ @@ -294,7 +293,8 @@ def _generate_failure_message(self, result: ValidatorResult, failed_mntner_list: def _find_related_mntners(self, rpsl_obj_new: RPSLObject) -> Optional[Tuple[str, str, List[str]]]: """ Find the maintainers of the related object to rpsl_obj_new, if any. - This is used to authorise creating objects. + This is used to authorise creating objects - authentication may be + required to pass for the related object as well. Returns a tuple of: - object class of the related object @@ -302,35 +302,47 @@ def _find_related_mntners(self, rpsl_obj_new: RPSLObject) -> Optional[Tuple[str, - List of maintainers for the related object (at least one must pass) Returns None of no related objects were found that should be authenticated. """ - if rpsl_obj_new.rpsl_object_class not in ['route', 'route6']: - return None + related_object = None + if rpsl_obj_new.rpsl_object_class in ['route', 'route6']: + related_object = self._find_related_object_route(rpsl_obj_new) + + if related_object: + mntners = related_object.get('parsed_data', {}).get('mnt-by', []) + return related_object['object_class'], related_object['rpsl_pk'], mntners + return None + + def _find_related_object_route(self, rpsl_obj_new: RPSLObject): + """ + Find the related inetnum/route object to rpsl_obj_new, which must be a route(6). + Returns a dict as returned by the database handler. + """ inetnum_class = { 'route': 'inetnum', 'route6': 'inet6num', } - def init_query(rpsl_object_class: str) -> RPSLDatabaseQuery: - query = RPSLDatabaseQuery().sources([rpsl_obj_new.source()]) - query = query.object_classes([rpsl_object_class]) - return query.first_only() - object_class = inetnum_class[rpsl_obj_new.rpsl_object_class] - query = init_query(object_class).ip_exact(rpsl_obj_new.prefix) + query = _init_related_object_query(object_class, rpsl_obj_new).ip_exact(rpsl_obj_new.prefix) inetnums = list(self.database_handler.execute_query(query)) + if not inetnums: - query = init_query(object_class).ip_less_specific_one_level(rpsl_obj_new.prefix) + query = _init_related_object_query(object_class, rpsl_obj_new).ip_less_specific_one_level(rpsl_obj_new.prefix) inetnums = list(self.database_handler.execute_query(query)) if inetnums: - mntners = inetnums[0].get('parsed_data', {}).get('mnt-by', []) - return inetnums[0]['object_class'], inetnums[0]['rpsl_pk'], mntners + return inetnums[0] object_class = rpsl_obj_new.rpsl_object_class - query = init_query(object_class).ip_less_specific_one_level(rpsl_obj_new.prefix) + query = _init_related_object_query(object_class, rpsl_obj_new).ip_less_specific_one_level(rpsl_obj_new.prefix) routes = list(self.database_handler.execute_query(query)) if routes: - mntners = routes[0].get('parsed_data', {}).get('mnt-by', []) - return routes[0]['object_class'], routes[0]['rpsl_pk'], mntners + return routes[0] return None + + +def _init_related_object_query(rpsl_object_class: str, rpsl_obj_new: RPSLObject) -> RPSLDatabaseQuery: + query = RPSLDatabaseQuery().sources([rpsl_obj_new.source()]) + query = query.object_classes([rpsl_object_class]) + return query.first_only() diff --git a/irrd/utils/rpsl_samples.py b/irrd/utils/rpsl_samples.py index eafcb764..da34ff00 100644 --- a/irrd/utils/rpsl_samples.py +++ b/irrd/utils/rpsl_samples.py @@ -494,6 +494,8 @@ source: TEST """ +SAMPLE_MNTNER_MD5 = 'md5-password' +SAMPLE_MNTNER_CRYPT = 'crypt-password' SAMPLE_MNTNER = """mntner: TEST-MNT admin-c: PERSON-TEST notify: notify@example.net