From 2b4bc679e6e197a7c2b9210a4e9d1cb175c15506 Mon Sep 17 00:00:00 2001 From: Alejandro Roiz Walss Date: Thu, 4 Apr 2024 11:31:29 -0600 Subject: [PATCH] avoid duplicate credentials --- confidant/routes/credentials.py | 25 +++++++++++++++++++++++++ confidant/services/credentialmanager.py | 13 +++++++++++++ 2 files changed, 38 insertions(+) diff --git a/confidant/routes/credentials.py b/confidant/routes/credentials.py index e186f870..927e8786 100644 --- a/confidant/routes/credentials.py +++ b/confidant/routes/credentials.py @@ -592,6 +592,7 @@ def create_credential(): correct format, or a required field was not provided. :statuscode 403: Client does not have access to create credentials. ''' + logger.info("Creating credential") if not acl_module_check(resource_type='credential', action='create'): msg = "{} does not have access to create credentials".format( authnz.get_logged_in_user() @@ -627,6 +628,16 @@ def create_credential(): for key, value in credential_pairs.items(): value = escape(value) credential_pairs[key] = value + + # Verify this credential is not a duplicate of an existing credential + is_duplicate, duplicate_id = credentialmanager.is_key_value_pair_duplicate( + credential_pairs + ) + if is_duplicate: + msg = '''Credential with the same key value pairs already exists. + See id: {0}'''.format(duplicate_id) + return jsonify({'error': msg, 'reference': duplicate_id}), 409 + credential_pairs = json.dumps(credential_pairs) data_key = keymanager.create_datakey(encryption_context={'id': id}) cipher = CipherManager(data_key['plaintext'], version=2) @@ -837,6 +848,10 @@ def update_credential(id): _cred.revision ) if 'credential_pairs' in data: + # Ensure the credential is not empty + if data['credential_pairs'] == {}: + return jsonify({'error': 'Credential Pairs cannot be empty'}), 400 + # Ensure credential pair keys are lowercase credential_pairs = credentialmanager.lowercase_credential_pairs( data['credential_pairs'] @@ -865,6 +880,16 @@ def update_credential(id): # this is a new credential pair and update last_rotation_date if credential_pairs != _cred.decrypted_credential_pairs: update['last_rotation_date'] = misc.utcnow() + + # Verify if after update the credential will not be a duplicate + is_duplicate, duplicate_id = credentialmanager.is_key_value_pair_duplicate( + credential_pairs + ) + if is_duplicate: + msg = '''Credential with the same key value pairs already exists. + See id: {0}'''.format(duplicate_id) + return jsonify({'error': msg, 'reference': duplicate_id}), 409 + data_key = keymanager.create_datakey(encryption_context={'id': id}) cipher = CipherManager(data_key['plaintext'], version=2) update['credential_pairs'] = cipher.encrypt( diff --git a/confidant/services/credentialmanager.py b/confidant/services/credentialmanager.py index ade09700..bd60a88c 100644 --- a/confidant/services/credentialmanager.py +++ b/confidant/services/credentialmanager.py @@ -59,6 +59,19 @@ def pair_key_conflicts_for_credentials(credential_ids, blind_credential_ids): return conflicts +def is_key_value_pair_duplicate(key_pairs): + logger.info("Checking if key value pair is duplicate") + logger.info("checking key_pairs: %s", key_pairs) + + all_credentials = Credential.data_type_date_index.query('credential') + + for cred in all_credentials: + if cred.decrypted_credential_pairs == key_pairs: + return True, cred.id + + return False, None + + def check_credential_pair_values(credential_pairs): for key, val in credential_pairs.items(): if isinstance(val, dict) or isinstance(val, list):