From f8ce003f61f114391ae53fa78180de0b12eb2b51 Mon Sep 17 00:00:00 2001 From: thinkst-pieter <89250089+thinkst-pieter@users.noreply.github.com> Date: Tue, 10 Dec 2024 14:25:19 +0200 Subject: [PATCH] Only allow valid base32 characters for fake file system token source data (#631) * only allow valid base32 characters * update test binary string used * Drop the = from the regex as we don't expect it. also clean decode some more --- canarytokens/tokens.py | 8 ++-- tests/units/test_tokens.py | 89 +++++++++++++++++++++++++++++++++----- 2 files changed, 81 insertions(+), 16 deletions(-) diff --git a/canarytokens/tokens.py b/canarytokens/tokens.py index 69bd87fc9..a9e5dd8ff 100644 --- a/canarytokens/tokens.py +++ b/canarytokens/tokens.py @@ -60,7 +60,7 @@ re.IGNORECASE, ) windows_fake_fs_pattern = re.compile( - r"u([0-9]*)\.f([A-Za-z0-9]*)\.i([A-Za-z0-9]*)\.", re.IGNORECASE + r"u([0-9]*)\.f([A-Z2-7]*)\.i([A-Z2-7]*)\.", re.IGNORECASE ) # to validate decoded sql username, not a data extractor: @@ -263,7 +263,7 @@ def _windows_fake_fs(matches: Match[AnyStr]) -> dict[str, dict[str, AnyStr]]: "windows_fake_fs_process_name": "(not obtained)", } if invocation_id: - data["windows_fake_fs_invocation_id"] = invocation_id[0:] + data["windows_fake_fs_invocation_id"] = invocation_id def correct_base32_padding(b32_data): padding_count = len(b32_data) % 8 @@ -272,10 +272,10 @@ def correct_base32_padding(b32_data): return b32_data if file_name and file_name != "f": - b32_data = correct_base32_padding(file_name[0:].upper()) + b32_data = correct_base32_padding(file_name.upper()) data["windows_fake_fs_file_name"] = base64.b32decode(b32_data).decode() if process_name and process_name != "i": - b32_data = correct_base32_padding(process_name[0:].upper()) + b32_data = correct_base32_padding(process_name.upper()) data["windows_fake_fs_process_name"] = base64.b32decode(b32_data).decode() return {"src_data": data} diff --git a/tests/units/test_tokens.py b/tests/units/test_tokens.py index e463058b4..c4b009b8c 100644 --- a/tests/units/test_tokens.py +++ b/tests/units/test_tokens.py @@ -90,48 +90,113 @@ def test_cmd_process_pattern( assert data["src_data"].get("cmd_invocation_id") == cmd_invocation_id +# string was chosen so it would use each of the 32 characters used by base 32 +ALL_BASE_32_CHARS_BYTES = b"WP*BHy@Sa9`M:6'F?u0G':~?\\7<7o`[mQ~?" +ALL_BASE_32_CHARS_STRING = base64.b32encode(ALL_BASE_32_CHARS_BYTES).decode() + + @pytest.mark.parametrize( - "query, invocation_id, file_name, process_name, ", + "query, invocation_id, file_name, process_name, pass_regex", [ - ( + pytest.param( "u7595.fMRXWGIDCFZSG6Y3Y.iMV4HA3DPOJSXELTFPBSQ.someid.sometoken.com", "7595", "doc b.docx", "explorer.exe", + True, + id="Valid Token", ), - ( + pytest.param( # ensure lowercase also works "u7595.fmrxwgidcfzsg6y3y.imv4ha3dpojsxeltfpbsq.someid.sometoken.com", "7595", "doc b.docx", "explorer.exe", + True, + id="Valid Token Lowercase", ), - ( + pytest.param( "u7595.f.iMV4HA3DPOJSXELTFPBSQ.someid.sometoken.com", "7595", "(not obtained)", "explorer.exe", + True, + id="Valid Token No File", ), - ( + pytest.param( "u7595.fMRXWGIDCFZSG6Y3Y.i.someid.sometoken.com", "7595", "doc b.docx", "(not obtained)", + True, + id="Valid Token No Process", ), - ( + pytest.param( "u7595.f.i.someid.sometoken.com", "7595", "(not obtained)", "(not obtained)", + True, + id="Valid Token No File or Process", + ), + pytest.param( + # Test invalid base32 char 0 is ignored + "u.f0.i.someid.sometoken.com", + None, + None, + None, + False, + id="Invalid Base32 Char 0", + ), + pytest.param( + # Test invalid base32 char 1 is ignored + "u.f1.i.someid.sometoken.com", + None, + None, + None, + False, + id="Invalid Base32 Char 1", + ), + pytest.param( + # Test invalid base32 char 8 is ignored + "u.f8.i.someid.sometoken.com", + None, + None, + None, + False, + id="Invalid Base32 Char 8", + ), + pytest.param( + # Test invalid base32 char 9 is ignored + "u.f9.i.someid.sometoken.com", + None, + None, + None, + False, + id="Invalid Base32 Char 9", + ), + pytest.param( + # Test all the valid base32 chars + f"u1.f{ALL_BASE_32_CHARS_STRING}.i{ALL_BASE_32_CHARS_STRING}.someid.sometoken.com", + "1", + ALL_BASE_32_CHARS_BYTES.decode(), + ALL_BASE_32_CHARS_BYTES.decode(), + True, + id="All valid Base32 Chars", ), ], ) -def test_windows_fake_fs_pattern(query, invocation_id, file_name, process_name): - m = t.windows_fake_fs_pattern.match(query) - data = t.Canarytoken._windows_fake_fs(m) - assert data["src_data"]["windows_fake_fs_invocation_id"] == invocation_id.lower() - assert data["src_data"]["windows_fake_fs_file_name"] == file_name.lower() - assert data["src_data"]["windows_fake_fs_process_name"] == process_name.lower() +def test_windows_fake_fs_pattern( + query, invocation_id, file_name, process_name, pass_regex +): + matches = t.windows_fake_fs_pattern.match(query) + assert (matches is not None) == pass_regex + + if pass_regex: + data = t.Canarytoken._windows_fake_fs(matches) + assert data["src_data"]["windows_fake_fs_invocation_id"] == invocation_id + assert data["src_data"]["windows_fake_fs_file_name"] == file_name + assert data["src_data"]["windows_fake_fs_process_name"] == process_name def test_windows_fake_fs_base32_padding():