Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix verify_attestation.py to accept distinct versions for UI and Signer #197

Merged
merged 3 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions middleware/admin/verify_attestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import json
import hashlib
import secp256k1 as ec
import re
from .misc import info, head, AdminError
from .utils import is_nonempty_hex_string
from .certificate import HSMCertificate


UI_MESSAGE_HEADER = b"HSM:UI:5.1"
SIGNER_MESSAGE_HEADER = b"HSM:SIGNER:5.1"
UI_MESSAGE_HEADER = b"HSM:UI:5.X"
SIGNER_MESSAGE_HEADER = b"HSM:SIGNER:5.X"
amendelzon marked this conversation as resolved.
Show resolved Hide resolved
UI_DERIVATION_PATH = "m/44'/0'/0'/0/0"
UD_VALUE_LENGTH = 32
PUBKEY_COMPRESSED_LENGTH = 33
Expand All @@ -45,6 +46,16 @@
"dad609"


def validate_ui_message_header(ui_message):
header = ui_message[:len(UI_MESSAGE_HEADER)]
return re.compile(b"^HSM:UI:5.[0-9]$").match(header) is not None


def validate_signer_message_header(signer_message):
header = signer_message[:len(SIGNER_MESSAGE_HEADER)]
return re.compile(b"^HSM:SIGNER:5.[0-9]$").match(header) is not None


def do_verify_attestation(options):
head("### -> Verify UI and Signer attestations", fill="#")

Expand Down Expand Up @@ -122,11 +133,12 @@ def do_verify_attestation(options):
ui_message = bytes.fromhex(ui_result[1])
ui_hash = bytes.fromhex(ui_result[2])
mh_len = len(UI_MESSAGE_HEADER)
if ui_message[:mh_len] != UI_MESSAGE_HEADER:
if not validate_ui_message_header(ui_message):
raise AdminError(
f"Invalid UI attestation message header: {ui_message[:mh_len].hex()}")

# Extract UD value, UI public key and signer version from message
# Extract UI version, UD value, UI public key and signer version from message
ui_version = re.match(b"^HSM:UI:(5.[0-9])$", ui_message[:mh_len]).group(1)
ud_value = ui_message[mh_len:mh_len + UD_VALUE_LENGTH].hex()
ui_public_key = ui_message[mh_len + UD_VALUE_LENGTH:mh_len + UD_VALUE_LENGTH +
PUBKEY_COMPRESSED_LENGTH].hex()
Expand All @@ -147,6 +159,7 @@ def do_verify_attestation(options):
f"Authorized signer hash: {signer_hash}",
f"Authorized signer iteration: {signer_iteration}",
f"Installed UI hash: {ui_hash.hex()}",
f"Installed UI version: {ui_version.decode()}",
],
fill="-",
)
Expand All @@ -163,10 +176,11 @@ def do_verify_attestation(options):
signer_message = bytes.fromhex(signer_result[1])
signer_hash = bytes.fromhex(signer_result[2])
mh_len = len(SIGNER_MESSAGE_HEADER)
if signer_message[:mh_len] != SIGNER_MESSAGE_HEADER:
if not validate_signer_message_header(signer_message):
raise AdminError(
f"Invalid Signer attestation message header: {signer_message[:mh_len].hex()}")

signer_version = re.match(b"^HSM:SIGNER:(5.[0-9])$", signer_message[:mh_len]).group(1)
if signer_message[mh_len:] != pubkeys_hash:
reported = signer_message[mh_len:].hex()
raise AdminError(
Expand All @@ -179,6 +193,7 @@ def do_verify_attestation(options):
"",
f"Hash: {signer_message[mh_len:].hex()}",
f"Installed Signer hash: {signer_hash.hex()}",
f"Installed Signer version: {signer_version.decode()}",
],
fill="-",
)
56 changes: 53 additions & 3 deletions middleware/tests/admin/test_verify_attestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@
from unittest.mock import Mock, call, patch, mock_open
from admin.misc import AdminError
from admin.pubkeys import PATHS
from admin.verify_attestation import do_verify_attestation
from admin.verify_attestation import (
do_verify_attestation,
validate_ui_message_header,
validate_signer_message_header
)
import ecdsa
import hashlib
import logging

logging.disable(logging.CRITICAL)

EXPECTED_UI_DERIVATION_PATH = "m/44'/0'/0'/0/0"
SIGNER_HEADER = b"HSM:SIGNER:5.1"
UI_HEADER = b"HSM:UI:5.1"


@patch("sys.stdout.write")
Expand Down Expand Up @@ -65,14 +71,14 @@ def setUp(self):
)
self.pubkeys_hash = pubkeys_hash.digest()

self.ui_msg = b"HSM:UI:5.1" + \
self.ui_msg = UI_HEADER + \
bytes.fromhex("aa"*32) + \
bytes.fromhex("bb"*33) + \
bytes.fromhex("cc"*32) + \
bytes.fromhex("0123")
self.ui_hash = bytes.fromhex("ee" * 32)

self.signer_msg = b"HSM:SIGNER:5.1" + \
self.signer_msg = SIGNER_HEADER + \
bytes.fromhex(self.pubkeys_hash.hex())
self.signer_hash = bytes.fromhex("ff" * 32)

Expand Down Expand Up @@ -108,6 +114,7 @@ def test_verify_attestation(self,
f"Authorized signer hash: {'cc'*32}",
"Authorized signer iteration: 291",
f"Installed UI hash: {'ee'*32}",
"Installed UI version: 5.1",
],
fill="-",
)
Expand All @@ -118,6 +125,7 @@ def test_verify_attestation(self,
"",
f"Hash: {self.pubkeys_hash.hex()}",
f"Installed Signer hash: {'ff'*32}",
"Installed Signer version: 5.1",
],
fill="-",
)
Expand Down Expand Up @@ -276,3 +284,45 @@ def test_verify_attestation_invalid_signer_att(self,
self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list)
self.assertEqual(("Invalid Signer attestation: error validating 'signer'"),
str(e.exception))

def test_validate_ui_message_header_valid_header(self, _):
valid_headers = [
UI_HEADER,
b"HSM:UI:5.0",
b"HSM:UI:5.5",
b"HSM:UI:5.9",
]
for header in valid_headers:
ui_message = header + self.ui_msg[len(UI_HEADER):]
self.assertTrue(validate_ui_message_header(ui_message))

def test_validate_ui_message_header_invalid_header(self, _):
invalid_headers = [
SIGNER_HEADER,
b"HSM:UI:4.0",
b"HSM:UI:5.X",
]
for header in invalid_headers:
ui_message = header + self.ui_msg[len(UI_HEADER):]
self.assertFalse(validate_ui_message_header(ui_message))

def test_validate_signer_message_header_valid_header(self, _):
valid_headers = [
SIGNER_HEADER,
b"HSM:SIGNER:5.0",
b"HSM:SIGNER:5.5",
b"HSM:SIGNER:5.9",
]
for header in valid_headers:
signer_message = header + self.signer_msg[len(SIGNER_HEADER):]
self.assertTrue(validate_signer_message_header(signer_message))

def test_validate_signer_message_header_invalid_header(self, _):
invalid_headers = [
UI_HEADER,
b"HSM:SIGNER:4.0",
b"HSM:SIGNER:5.X",
]
for header in invalid_headers:
signer_message = header + self.signer_msg[len(SIGNER_HEADER):]
self.assertFalse(validate_signer_message_header(signer_message))
Loading