Skip to content

Commit

Permalink
Only allow valid base32 characters for fake file system token source …
Browse files Browse the repository at this point in the history
…data (#631)

* only allow valid base32 characters

* update test binary string used

* Drop the = from the regex as we don't expect it. also clean decode some more
  • Loading branch information
thinkst-pieter authored Dec 10, 2024
1 parent 7e83cf6 commit f8ce003
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 16 deletions.
8 changes: 4 additions & 4 deletions canarytokens/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
re.IGNORECASE,
)
windows_fake_fs_pattern = re.compile(
r"u([0-9]*)\.f([A-Za-z0-9]*)\.i([A-Za-z0-9]*)\.", re.IGNORECASE
r"u([0-9]*)\.f([A-Z2-7]*)\.i([A-Z2-7]*)\.", re.IGNORECASE
)

# to validate decoded sql username, not a data extractor:
Expand Down Expand Up @@ -263,7 +263,7 @@ def _windows_fake_fs(matches: Match[AnyStr]) -> dict[str, dict[str, AnyStr]]:
"windows_fake_fs_process_name": "(not obtained)",
}
if invocation_id:
data["windows_fake_fs_invocation_id"] = invocation_id[0:]
data["windows_fake_fs_invocation_id"] = invocation_id

def correct_base32_padding(b32_data):
padding_count = len(b32_data) % 8
Expand All @@ -272,10 +272,10 @@ def correct_base32_padding(b32_data):
return b32_data

if file_name and file_name != "f":
b32_data = correct_base32_padding(file_name[0:].upper())
b32_data = correct_base32_padding(file_name.upper())
data["windows_fake_fs_file_name"] = base64.b32decode(b32_data).decode()
if process_name and process_name != "i":
b32_data = correct_base32_padding(process_name[0:].upper())
b32_data = correct_base32_padding(process_name.upper())
data["windows_fake_fs_process_name"] = base64.b32decode(b32_data).decode()

return {"src_data": data}
Expand Down
89 changes: 77 additions & 12 deletions tests/units/test_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,48 +90,113 @@ def test_cmd_process_pattern(
assert data["src_data"].get("cmd_invocation_id") == cmd_invocation_id


# string was chosen so it would use each of the 32 characters used by base 32
ALL_BASE_32_CHARS_BYTES = b"WP*BHy@Sa9`M:6'F?u0G':~?\\7<7o`[mQ~?"
ALL_BASE_32_CHARS_STRING = base64.b32encode(ALL_BASE_32_CHARS_BYTES).decode()


@pytest.mark.parametrize(
"query, invocation_id, file_name, process_name, ",
"query, invocation_id, file_name, process_name, pass_regex",
[
(
pytest.param(
"u7595.fMRXWGIDCFZSG6Y3Y.iMV4HA3DPOJSXELTFPBSQ.someid.sometoken.com",
"7595",
"doc b.docx",
"explorer.exe",
True,
id="Valid Token",
),
(
pytest.param(
# ensure lowercase also works
"u7595.fmrxwgidcfzsg6y3y.imv4ha3dpojsxeltfpbsq.someid.sometoken.com",
"7595",
"doc b.docx",
"explorer.exe",
True,
id="Valid Token Lowercase",
),
(
pytest.param(
"u7595.f.iMV4HA3DPOJSXELTFPBSQ.someid.sometoken.com",
"7595",
"(not obtained)",
"explorer.exe",
True,
id="Valid Token No File",
),
(
pytest.param(
"u7595.fMRXWGIDCFZSG6Y3Y.i.someid.sometoken.com",
"7595",
"doc b.docx",
"(not obtained)",
True,
id="Valid Token No Process",
),
(
pytest.param(
"u7595.f.i.someid.sometoken.com",
"7595",
"(not obtained)",
"(not obtained)",
True,
id="Valid Token No File or Process",
),
pytest.param(
# Test invalid base32 char 0 is ignored
"u.f0.i.someid.sometoken.com",
None,
None,
None,
False,
id="Invalid Base32 Char 0",
),
pytest.param(
# Test invalid base32 char 1 is ignored
"u.f1.i.someid.sometoken.com",
None,
None,
None,
False,
id="Invalid Base32 Char 1",
),
pytest.param(
# Test invalid base32 char 8 is ignored
"u.f8.i.someid.sometoken.com",
None,
None,
None,
False,
id="Invalid Base32 Char 8",
),
pytest.param(
# Test invalid base32 char 9 is ignored
"u.f9.i.someid.sometoken.com",
None,
None,
None,
False,
id="Invalid Base32 Char 9",
),
pytest.param(
# Test all the valid base32 chars
f"u1.f{ALL_BASE_32_CHARS_STRING}.i{ALL_BASE_32_CHARS_STRING}.someid.sometoken.com",
"1",
ALL_BASE_32_CHARS_BYTES.decode(),
ALL_BASE_32_CHARS_BYTES.decode(),
True,
id="All valid Base32 Chars",
),
],
)
def test_windows_fake_fs_pattern(query, invocation_id, file_name, process_name):
m = t.windows_fake_fs_pattern.match(query)
data = t.Canarytoken._windows_fake_fs(m)
assert data["src_data"]["windows_fake_fs_invocation_id"] == invocation_id.lower()
assert data["src_data"]["windows_fake_fs_file_name"] == file_name.lower()
assert data["src_data"]["windows_fake_fs_process_name"] == process_name.lower()
def test_windows_fake_fs_pattern(
query, invocation_id, file_name, process_name, pass_regex
):
matches = t.windows_fake_fs_pattern.match(query)
assert (matches is not None) == pass_regex

if pass_regex:
data = t.Canarytoken._windows_fake_fs(matches)
assert data["src_data"]["windows_fake_fs_invocation_id"] == invocation_id
assert data["src_data"]["windows_fake_fs_file_name"] == file_name
assert data["src_data"]["windows_fake_fs_process_name"] == process_name


def test_windows_fake_fs_base32_padding():
Expand Down

0 comments on commit f8ce003

Please sign in to comment.