From fdea642724b13d59f10fbf056c66e4c603f58529 Mon Sep 17 00:00:00 2001 From: Italo Sampaio Date: Wed, 4 Sep 2024 18:49:12 -0300 Subject: [PATCH 1/3] Fix verify_attestation.py to accept distinct versions for UI and Signer - Version MINOR is now allowed to be distinct between UI and Signer - Script outputs the installed version of both apps --- middleware/admin/verify_attestation.py | 36 ++++++++++++++++--- .../tests/admin/test_verify_attestation.py | 2 ++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/middleware/admin/verify_attestation.py b/middleware/admin/verify_attestation.py index e22caf10..514c2d60 100644 --- a/middleware/admin/verify_attestation.py +++ b/middleware/admin/verify_attestation.py @@ -28,8 +28,8 @@ from .certificate import HSMCertificate -UI_MESSAGE_HEADER = b"HSM:UI:5.1" -SIGNER_MESSAGE_HEADER = b"HSM:SIGNER:5.1" +UI_MESSAGE_HEADER = b"HSM:UI:5.X" +SIGNER_MESSAGE_HEADER = b"HSM:SIGNER:5.X" UI_DERIVATION_PATH = "m/44'/0'/0'/0/0" UD_VALUE_LENGTH = 32 PUBKEY_COMPRESSED_LENGTH = 33 @@ -45,6 +45,26 @@ "dad609" +def validate_ui_message_header(ui_message): + minor_offset = len(UI_MESSAGE_HEADER) - 1 + if ui_message[:minor_offset] != UI_MESSAGE_HEADER[:minor_offset]: + raise AdminError() + version_minor = ui_message[minor_offset] + # The minor version must be a single digit between 0 and 9 + if version_minor < 48 or version_minor > 57: + raise AdminError() + + +def validate_signer_message_header(signer_message): + minor_offset = len(SIGNER_MESSAGE_HEADER) - 1 + if signer_message[:minor_offset] != SIGNER_MESSAGE_HEADER[:minor_offset]: + raise AdminError() + version_minor = signer_message[minor_offset] + # The minor version must be a single digit between 0 and 9 + if version_minor < 48 or version_minor > 57: + raise AdminError() + + def do_verify_attestation(options): head("### -> Verify UI and Signer attestations", fill="#") @@ -122,7 +142,9 @@ def do_verify_attestation(options): ui_message = bytes.fromhex(ui_result[1]) ui_hash = bytes.fromhex(ui_result[2]) mh_len = len(UI_MESSAGE_HEADER) - if ui_message[:mh_len] != UI_MESSAGE_HEADER: + try: + validate_ui_message_header(ui_message) + except Exception: raise AdminError( f"Invalid UI attestation message header: {ui_message[:mh_len].hex()}") @@ -138,6 +160,7 @@ def do_verify_attestation(options): mh_len + UD_VALUE_LENGTH + PUBKEY_COMPRESSED_LENGTH + SIGNER_HASH_LENGTH + SIGNER_ITERATION_LENGTH] signer_iteration = int.from_bytes(signer_iteration, byteorder='big', signed=False) + ui_version = ui_message[mh_len - 3:mh_len] head( [ @@ -147,6 +170,7 @@ def do_verify_attestation(options): f"Authorized signer hash: {signer_hash}", f"Authorized signer iteration: {signer_iteration}", f"Installed UI hash: {ui_hash.hex()}", + f"Installed UI version: {ui_version.decode()}", ], fill="-", ) @@ -163,7 +187,9 @@ def do_verify_attestation(options): signer_message = bytes.fromhex(signer_result[1]) signer_hash = bytes.fromhex(signer_result[2]) mh_len = len(SIGNER_MESSAGE_HEADER) - if signer_message[:mh_len] != SIGNER_MESSAGE_HEADER: + try: + validate_signer_message_header(signer_message) + except Exception: raise AdminError( f"Invalid Signer attestation message header: {signer_message[:mh_len].hex()}") @@ -173,12 +199,14 @@ def do_verify_attestation(options): f"Signer attestation public keys hash mismatch: expected {pubkeys_hash.hex()}" f" but attestation reports {reported}" ) + signer_version = signer_message[mh_len - 3:mh_len] head( ["Signer verified with public keys:"] + pubkeys_output + [ "", f"Hash: {signer_message[mh_len:].hex()}", f"Installed Signer hash: {signer_hash.hex()}", + f"Installed Signer version: {signer_version.decode()}", ], fill="-", ) diff --git a/middleware/tests/admin/test_verify_attestation.py b/middleware/tests/admin/test_verify_attestation.py index 4da01987..c651c2c7 100644 --- a/middleware/tests/admin/test_verify_attestation.py +++ b/middleware/tests/admin/test_verify_attestation.py @@ -108,6 +108,7 @@ def test_verify_attestation(self, f"Authorized signer hash: {'cc'*32}", "Authorized signer iteration: 291", f"Installed UI hash: {'ee'*32}", + "Installed UI version: 5.1", ], fill="-", ) @@ -118,6 +119,7 @@ def test_verify_attestation(self, "", f"Hash: {self.pubkeys_hash.hex()}", f"Installed Signer hash: {'ff'*32}", + "Installed Signer version: 5.1", ], fill="-", ) From f55c161ac3947ea88457c957fd6502315e7eb38c Mon Sep 17 00:00:00 2001 From: Italo Sampaio Date: Thu, 5 Sep 2024 12:32:51 -0300 Subject: [PATCH 2/3] Changes after code review - Using regex to validate UI and Signer headers - Added unit tests for header validation functions --- middleware/admin/verify_attestation.py | 33 ++++-------- .../tests/admin/test_verify_attestation.py | 54 +++++++++++++++++-- 2 files changed, 61 insertions(+), 26 deletions(-) diff --git a/middleware/admin/verify_attestation.py b/middleware/admin/verify_attestation.py index 514c2d60..dce67200 100644 --- a/middleware/admin/verify_attestation.py +++ b/middleware/admin/verify_attestation.py @@ -23,6 +23,7 @@ import json import hashlib import secp256k1 as ec +import re from .misc import info, head, AdminError from .utils import is_nonempty_hex_string from .certificate import HSMCertificate @@ -46,23 +47,13 @@ def validate_ui_message_header(ui_message): - minor_offset = len(UI_MESSAGE_HEADER) - 1 - if ui_message[:minor_offset] != UI_MESSAGE_HEADER[:minor_offset]: - raise AdminError() - version_minor = ui_message[minor_offset] - # The minor version must be a single digit between 0 and 9 - if version_minor < 48 or version_minor > 57: - raise AdminError() + header = ui_message[:len(UI_MESSAGE_HEADER)] + return re.compile(b"^HSM:UI:5.[0-9]$").match(header) is not None def validate_signer_message_header(signer_message): - minor_offset = len(SIGNER_MESSAGE_HEADER) - 1 - if signer_message[:minor_offset] != SIGNER_MESSAGE_HEADER[:minor_offset]: - raise AdminError() - version_minor = signer_message[minor_offset] - # The minor version must be a single digit between 0 and 9 - if version_minor < 48 or version_minor > 57: - raise AdminError() + header = signer_message[:len(SIGNER_MESSAGE_HEADER)] + return re.compile(b"^HSM:SIGNER:5.[0-9]$").match(header) is not None def do_verify_attestation(options): @@ -142,13 +133,12 @@ def do_verify_attestation(options): ui_message = bytes.fromhex(ui_result[1]) ui_hash = bytes.fromhex(ui_result[2]) mh_len = len(UI_MESSAGE_HEADER) - try: - validate_ui_message_header(ui_message) - except Exception: + if not validate_ui_message_header(ui_message): raise AdminError( f"Invalid UI attestation message header: {ui_message[:mh_len].hex()}") - # Extract UD value, UI public key and signer version from message + # Extract UI version, UD value, UI public key and signer version from message + ui_version = re.match(b"^HSM:UI:(5.[0-9])$", ui_message[:mh_len]).group(1) ud_value = ui_message[mh_len:mh_len + UD_VALUE_LENGTH].hex() ui_public_key = ui_message[mh_len + UD_VALUE_LENGTH:mh_len + UD_VALUE_LENGTH + PUBKEY_COMPRESSED_LENGTH].hex() @@ -160,7 +150,6 @@ def do_verify_attestation(options): mh_len + UD_VALUE_LENGTH + PUBKEY_COMPRESSED_LENGTH + SIGNER_HASH_LENGTH + SIGNER_ITERATION_LENGTH] signer_iteration = int.from_bytes(signer_iteration, byteorder='big', signed=False) - ui_version = ui_message[mh_len - 3:mh_len] head( [ @@ -187,19 +176,17 @@ def do_verify_attestation(options): signer_message = bytes.fromhex(signer_result[1]) signer_hash = bytes.fromhex(signer_result[2]) mh_len = len(SIGNER_MESSAGE_HEADER) - try: - validate_signer_message_header(signer_message) - except Exception: + if not validate_signer_message_header(signer_message): raise AdminError( f"Invalid Signer attestation message header: {signer_message[:mh_len].hex()}") + signer_version = re.match(b"^HSM:SIGNER:(5.[0-9])$", signer_message[:mh_len]).group(1) if signer_message[mh_len:] != pubkeys_hash: reported = signer_message[mh_len:].hex() raise AdminError( f"Signer attestation public keys hash mismatch: expected {pubkeys_hash.hex()}" f" but attestation reports {reported}" ) - signer_version = signer_message[mh_len - 3:mh_len] head( ["Signer verified with public keys:"] + pubkeys_output + [ diff --git a/middleware/tests/admin/test_verify_attestation.py b/middleware/tests/admin/test_verify_attestation.py index c651c2c7..81e21965 100644 --- a/middleware/tests/admin/test_verify_attestation.py +++ b/middleware/tests/admin/test_verify_attestation.py @@ -25,7 +25,11 @@ from unittest.mock import Mock, call, patch, mock_open from admin.misc import AdminError from admin.pubkeys import PATHS -from admin.verify_attestation import do_verify_attestation +from admin.verify_attestation import ( + do_verify_attestation, + validate_ui_message_header, + validate_signer_message_header +) import ecdsa import hashlib import logging @@ -33,6 +37,8 @@ logging.disable(logging.CRITICAL) EXPECTED_UI_DERIVATION_PATH = "m/44'/0'/0'/0/0" +SIGNER_HEADER = b"HSM:SIGNER:5.1" +UI_HEADER = b"HSM:UI:5.1" @patch("sys.stdout.write") @@ -65,14 +71,14 @@ def setUp(self): ) self.pubkeys_hash = pubkeys_hash.digest() - self.ui_msg = b"HSM:UI:5.1" + \ + self.ui_msg = UI_HEADER + \ bytes.fromhex("aa"*32) + \ bytes.fromhex("bb"*33) + \ bytes.fromhex("cc"*32) + \ bytes.fromhex("0123") self.ui_hash = bytes.fromhex("ee" * 32) - self.signer_msg = b"HSM:SIGNER:5.1" + \ + self.signer_msg = SIGNER_HEADER + \ bytes.fromhex(self.pubkeys_hash.hex()) self.signer_hash = bytes.fromhex("ff" * 32) @@ -278,3 +284,45 @@ def test_verify_attestation_invalid_signer_att(self, self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) self.assertEqual(("Invalid Signer attestation: error validating 'signer'"), str(e.exception)) + + def test_validate_ui_message_header_valid_header(self, _): + valid_headers = [ + UI_HEADER, + b"HSM:UI:5.0", + b"HSM:UI:5.5", + b"HSM:UI:5.9", + ] + for header in valid_headers: + ui_message = header + self.ui_msg[len(UI_HEADER):] + self.assertTrue(validate_ui_message_header(ui_message)) + + def test_validate_ui_message_header_invalid_header(self, _): + invalid_headers = [ + SIGNER_HEADER, + b"HSM:UI:4.0", + b"HSM:UI:5.X", + ] + for header in invalid_headers: + ui_message = header + self.ui_msg[len(UI_HEADER):] + self.assertFalse(validate_ui_message_header(ui_message)) + + def test_validate_signer_message_header_valid_header(self, _): + valid_headers = [ + SIGNER_HEADER, + b"HSM:SIGNER:5.0", + b"HSM:SIGNER:5.5", + b"HSM:SIGNER:5.9", + ] + for header in valid_headers: + signer_message = header + self.signer_msg[len(SIGNER_HEADER):] + self.assertTrue(validate_signer_message_header(signer_message)) + + def test_validate_signer_message_header_invalid_header(self, _): + invalid_headers = [ + UI_HEADER, + b"HSM:SIGNER:4.0", + b"HSM:SIGNER:5.X", + ] + for header in invalid_headers: + signer_message = header + self.signer_msg[len(SIGNER_HEADER):] + self.assertFalse(validate_signer_message_header(signer_message)) From e023df622472a55bc163e0488d4161427e5f8cee Mon Sep 17 00:00:00 2001 From: Italo Sampaio Date: Thu, 5 Sep 2024 19:39:41 -0300 Subject: [PATCH 3/3] Changes after second round of code review - Reduced code duplication by using a pre-compiled regular expression --- middleware/admin/verify_attestation.py | 32 +++++++++---------- .../tests/admin/test_verify_attestation.py | 20 ++++++------ 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/middleware/admin/verify_attestation.py b/middleware/admin/verify_attestation.py index dce67200..3d398267 100644 --- a/middleware/admin/verify_attestation.py +++ b/middleware/admin/verify_attestation.py @@ -29,8 +29,8 @@ from .certificate import HSMCertificate -UI_MESSAGE_HEADER = b"HSM:UI:5.X" -SIGNER_MESSAGE_HEADER = b"HSM:SIGNER:5.X" +UI_MESSAGE_HEADER_REGEX = re.compile(b"^HSM:UI:(5.[0-9])") +SIGNER_MESSAGE_HEADER_REGEX = re.compile(b"^HSM:SIGNER:(5.[0-9])") UI_DERIVATION_PATH = "m/44'/0'/0'/0/0" UD_VALUE_LENGTH = 32 PUBKEY_COMPRESSED_LENGTH = 33 @@ -46,14 +46,12 @@ "dad609" -def validate_ui_message_header(ui_message): - header = ui_message[:len(UI_MESSAGE_HEADER)] - return re.compile(b"^HSM:UI:5.[0-9]$").match(header) is not None +def match_ui_message_header(ui_message): + return UI_MESSAGE_HEADER_REGEX.match(ui_message) -def validate_signer_message_header(signer_message): - header = signer_message[:len(SIGNER_MESSAGE_HEADER)] - return re.compile(b"^HSM:SIGNER:5.[0-9]$").match(header) is not None +def match_signer_message_header(signer_message): + return SIGNER_MESSAGE_HEADER_REGEX.match(signer_message) def do_verify_attestation(options): @@ -132,13 +130,14 @@ def do_verify_attestation(options): ui_message = bytes.fromhex(ui_result[1]) ui_hash = bytes.fromhex(ui_result[2]) - mh_len = len(UI_MESSAGE_HEADER) - if not validate_ui_message_header(ui_message): + mh_match = match_ui_message_header(ui_message) + if mh_match is None: raise AdminError( - f"Invalid UI attestation message header: {ui_message[:mh_len].hex()}") + f"Invalid UI attestation message header: {ui_message.hex()}") + mh_len = len(mh_match.group(0)) # Extract UI version, UD value, UI public key and signer version from message - ui_version = re.match(b"^HSM:UI:(5.[0-9])$", ui_message[:mh_len]).group(1) + ui_version = mh_match.group(1) ud_value = ui_message[mh_len:mh_len + UD_VALUE_LENGTH].hex() ui_public_key = ui_message[mh_len + UD_VALUE_LENGTH:mh_len + UD_VALUE_LENGTH + PUBKEY_COMPRESSED_LENGTH].hex() @@ -175,12 +174,12 @@ def do_verify_attestation(options): signer_message = bytes.fromhex(signer_result[1]) signer_hash = bytes.fromhex(signer_result[2]) - mh_len = len(SIGNER_MESSAGE_HEADER) - if not validate_signer_message_header(signer_message): + mh_match = match_signer_message_header(signer_message) + if mh_match is None: raise AdminError( - f"Invalid Signer attestation message header: {signer_message[:mh_len].hex()}") + f"Invalid Signer attestation message header: {signer_message.hex()}") - signer_version = re.match(b"^HSM:SIGNER:(5.[0-9])$", signer_message[:mh_len]).group(1) + mh_len = len(mh_match.group(0)) if signer_message[mh_len:] != pubkeys_hash: reported = signer_message[mh_len:].hex() raise AdminError( @@ -188,6 +187,7 @@ def do_verify_attestation(options): f" but attestation reports {reported}" ) + signer_version = mh_match.group(1) head( ["Signer verified with public keys:"] + pubkeys_output + [ "", diff --git a/middleware/tests/admin/test_verify_attestation.py b/middleware/tests/admin/test_verify_attestation.py index 81e21965..b3b0286f 100644 --- a/middleware/tests/admin/test_verify_attestation.py +++ b/middleware/tests/admin/test_verify_attestation.py @@ -27,8 +27,8 @@ from admin.pubkeys import PATHS from admin.verify_attestation import ( do_verify_attestation, - validate_ui_message_header, - validate_signer_message_header + match_ui_message_header, + match_signer_message_header ) import ecdsa import hashlib @@ -285,7 +285,7 @@ def test_verify_attestation_invalid_signer_att(self, self.assertEqual(("Invalid Signer attestation: error validating 'signer'"), str(e.exception)) - def test_validate_ui_message_header_valid_header(self, _): + def test_match_ui_message_header_valid_header(self, _): valid_headers = [ UI_HEADER, b"HSM:UI:5.0", @@ -294,9 +294,9 @@ def test_validate_ui_message_header_valid_header(self, _): ] for header in valid_headers: ui_message = header + self.ui_msg[len(UI_HEADER):] - self.assertTrue(validate_ui_message_header(ui_message)) + self.assertTrue(match_ui_message_header(ui_message)) - def test_validate_ui_message_header_invalid_header(self, _): + def test_match_ui_message_header_invalid_header(self, _): invalid_headers = [ SIGNER_HEADER, b"HSM:UI:4.0", @@ -304,9 +304,9 @@ def test_validate_ui_message_header_invalid_header(self, _): ] for header in invalid_headers: ui_message = header + self.ui_msg[len(UI_HEADER):] - self.assertFalse(validate_ui_message_header(ui_message)) + self.assertFalse(match_ui_message_header(ui_message)) - def test_validate_signer_message_header_valid_header(self, _): + def test_match_signer_message_header_valid_header(self, _): valid_headers = [ SIGNER_HEADER, b"HSM:SIGNER:5.0", @@ -315,9 +315,9 @@ def test_validate_signer_message_header_valid_header(self, _): ] for header in valid_headers: signer_message = header + self.signer_msg[len(SIGNER_HEADER):] - self.assertTrue(validate_signer_message_header(signer_message)) + self.assertTrue(match_signer_message_header(signer_message)) - def test_validate_signer_message_header_invalid_header(self, _): + def test_match_signer_message_header_invalid_header(self, _): invalid_headers = [ UI_HEADER, b"HSM:SIGNER:4.0", @@ -325,4 +325,4 @@ def test_validate_signer_message_header_invalid_header(self, _): ] for header in invalid_headers: signer_message = header + self.signer_msg[len(SIGNER_HEADER):] - self.assertFalse(validate_signer_message_header(signer_message)) + self.assertFalse(match_signer_message_header(signer_message))