-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add helpers for JWS message validation #55
Conversation
Warning Rate limit exceeded@jschlyter has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 14 minutes and 3 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (2)
WalkthroughThis update introduces a new module for JSON Web Signature (JWS) verification. The changes include the creation of a new file that defines a subclass for handling key resolution, functions to verify JWS messages, and a command-line entry point. Additionally, the package version has been incremented and a new optional dependency ( Changes
Sequence Diagram(s)sequenceDiagram
participant U as User
participant M as main()
participant V as verify_jws_with_keys()
participant R as ResolvedJWKSet
participant KR as KeyResolver/Key Server
U->>M: Provide API URL & JWS message
M->>M: Parse arguments & configure logging
M->>V: Call verify_jws_with_keys(jws, keys)
V->>R: Request key by kid (get_key)
R->>KR: Query for public key
KR-->>R: Return public key
R-->>V: Return resolved key(s)
V-->>M: Return verified key if signature valid
M->>U: Log decoded payload and key details
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (4)
tests/test_jws.py (2)
26-30
: Consider testing error scenarios.While the happy path is well tested, consider adding test cases for:
- Invalid/malformed PEM responses
- HTTP errors (404, 500, etc.)
- Network timeouts
32-36
: Consider testing with different payload types.The test could be more robust by testing different payload types (strings, numbers, nested objects, etc.) and edge cases like empty payloads.
dnstapir/jws.py (2)
15-24
: Consider caching resolved keys.The
ResolvedJWKSet
could benefit from caching resolved keys to avoid repeated HTTP requests for the same kid.class ResolvedJWKSet(JWKSet): def __init__(self, key_resolver: KeyResolver): super().__init__() self.key_resolver = key_resolver + self._cache = {} def get_key(self, kid: str) -> JWK: + if kid in self._cache: + return self._cache[kid] + key = JWK.from_pyca(self.key_resolver.resolve_public_key(kid)) + self._cache[kid] = key + return key
27-42
: Add type hints for protected_header.The
protected_header
variable should have a type hint since it's parsed from JSON.def verify_jws_with_keys(jws: JWS, keys: JWKSet) -> JWK: """Verify JWS using keys and return key (or raise JWKeyNotFound)""" - protected_header = json.loads(jws.objects["protected"]) + protected_header: dict[str, str] = json.loads(jws.objects["protected"])
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
poetry.lock
is excluded by!**/*.lock
📒 Files selected for processing (3)
dnstapir/jws.py
(1 hunks)pyproject.toml
(3 hunks)tests/test_jws.py
(1 hunks)
🔇 Additional comments (3)
tests/test_jws.py (1)
14-24
: LGTM! Good test setup with Ed25519 keys.The test setup is well-structured with clear key generation and initialization steps.
pyproject.toml (2)
36-45
: LGTM! Appropriate grouping in extras.The jwcrypto dependency is correctly added to the keymanager extras group.
3-3
: Verify jwcrypto version compatibility.The version constraint
^1.5.6
for jwcrypto allows updates to any 1.x.x version. Please verify compatibility with the latest versions.Also applies to: 24-24
✅ Verification successful
JWCrypto Dependency Compatibility Looks Good
- The latest version on PyPI is 1.5.6, which meets the
^1.5.6
version constraint.- No breaking changes were reported in the changelog.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check latest versions and compatibility of jwcrypto # Get latest version from PyPI echo "Latest version from PyPI:" curl -s https://pypi.org/pypi/jwcrypto/json | jq -r '.info.version' # Check for breaking changes in changelog echo -e "\nBreaking changes in recent versions:" curl -s https://raw.githubusercontent.com/latchset/jwcrypto/master/CHANGELOG.md | grep -A 5 "Breaking changes"Length of output: 336
tests/test_jws.py
Outdated
# Set up key resolver | ||
client_database_base_url = "https://keys/api/v1/node/{key_id}/public_key" | ||
key_resolver = UrlKeyResolver(client_database_base_url=client_database_base_url) | ||
keyset = ResolvedJWKSet(key_resolver=key_resolver) | ||
|
||
# Verify message (public key lookup via resolver) | ||
jws = JWS() | ||
jws.deserialize(message) | ||
verified_jwk = verify_jws_with_keys(jws, keyset) | ||
assert verified_jwk.thumbprint() == public_jwk.thumbprint() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add assertions for error cases.
The verification test should also assert that:
- Invalid signatures are rejected
- Missing kid in header is handled
- Non-existent keys are handled
dnstapir/jws.py
Outdated
def main() -> None: | ||
"""Main function""" | ||
|
||
parser = argparse.ArgumentParser(description="JWS Verifier") | ||
|
||
parser.add_argument("--nodeman", help="Nodeman API") | ||
parser.add_argument("--debug", action="store_true", help="Enable debugging") | ||
parser.add_argument("message", help="JWS message") | ||
|
||
args = parser.parse_args() | ||
|
||
if args.debug: | ||
logging.basicConfig(level=logging.DEBUG) | ||
else: | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
client_database_base_url = urljoin(args.nodeman, "/api/v1/node/{key_id}/public_key") | ||
|
||
key_resolver = UrlKeyResolver(client_database_base_url=client_database_base_url) | ||
keyset = ResolvedJWKSet(key_resolver=key_resolver) | ||
|
||
jws = JWS() | ||
jws.deserialize(args.message) | ||
key = verify_jws_with_keys(jws, keyset) | ||
|
||
logging.info("Found JWK: %s", json.dumps(key.export(as_dict=True))) | ||
logging.info("Message: %s", jws.payload.decode()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for CLI arguments.
The CLI should handle:
- Invalid JWS messages
- Invalid/missing Nodeman API URL
- Network errors during key resolution
def main() -> None:
"""Main function"""
parser = argparse.ArgumentParser(description="JWS Verifier")
parser.add_argument("--nodeman", help="Nodeman API")
parser.add_argument("--debug", action="store_true", help="Enable debugging")
parser.add_argument("message", help="JWS message")
args = parser.parse_args()
+ if not args.nodeman:
+ parser.error("--nodeman is required")
+
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
client_database_base_url = urljoin(args.nodeman, "/api/v1/node/{key_id}/public_key")
- key_resolver = UrlKeyResolver(client_database_base_url=client_database_base_url)
- keyset = ResolvedJWKSet(key_resolver=key_resolver)
+ try:
+ key_resolver = UrlKeyResolver(client_database_base_url=client_database_base_url)
+ keyset = ResolvedJWKSet(key_resolver=key_resolver)
- jws = JWS()
- jws.deserialize(args.message)
- key = verify_jws_with_keys(jws, keyset)
+ jws = JWS()
+ jws.deserialize(args.message)
+ key = verify_jws_with_keys(jws, keyset)
- logging.info("Found JWK: %s", json.dumps(key.export(as_dict=True)))
- logging.info("Message: %s", jws.payload.decode())
+ logging.info("Found JWK: %s", json.dumps(key.export(as_dict=True)))
+ logging.info("Message: %s", jws.payload.decode())
+ except Exception as e:
+ logging.error("Error: %s", str(e))
+ sys.exit(1)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def main() -> None: | |
"""Main function""" | |
parser = argparse.ArgumentParser(description="JWS Verifier") | |
parser.add_argument("--nodeman", help="Nodeman API") | |
parser.add_argument("--debug", action="store_true", help="Enable debugging") | |
parser.add_argument("message", help="JWS message") | |
args = parser.parse_args() | |
if args.debug: | |
logging.basicConfig(level=logging.DEBUG) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
client_database_base_url = urljoin(args.nodeman, "/api/v1/node/{key_id}/public_key") | |
key_resolver = UrlKeyResolver(client_database_base_url=client_database_base_url) | |
keyset = ResolvedJWKSet(key_resolver=key_resolver) | |
jws = JWS() | |
jws.deserialize(args.message) | |
key = verify_jws_with_keys(jws, keyset) | |
logging.info("Found JWK: %s", json.dumps(key.export(as_dict=True))) | |
logging.info("Message: %s", jws.payload.decode()) | |
def main() -> None: | |
"""Main function""" | |
parser = argparse.ArgumentParser(description="JWS Verifier") | |
parser.add_argument("--nodeman", help="Nodeman API") | |
parser.add_argument("--debug", action="store_true", help="Enable debugging") | |
parser.add_argument("message", help="JWS message") | |
args = parser.parse_args() | |
if not args.nodeman: | |
parser.error("--nodeman is required") | |
if args.debug: | |
logging.basicConfig(level=logging.DEBUG) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
client_database_base_url = urljoin(args.nodeman, "/api/v1/node/{key_id}/public_key") | |
try: | |
key_resolver = UrlKeyResolver(client_database_base_url=client_database_base_url) | |
keyset = ResolvedJWKSet(key_resolver=key_resolver) | |
jws = JWS() | |
jws.deserialize(args.message) | |
key = verify_jws_with_keys(jws, keyset) | |
logging.info("Found JWK: %s", json.dumps(key.export(as_dict=True))) | |
logging.info("Message: %s", jws.payload.decode()) | |
except Exception as e: | |
logging.error("Error: %s", str(e)) | |
sys.exit(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
dnstapir/jws.py (1)
50-81
: Consider broader exception handling inmain()
.Key resolution or signature verification errors can occur (network failures, malformed JWS, invalid keys). Encapsulating the verification logic in a try-except block here can provide user-friendly CLI error messages and proper exit codes, per your past review notes.
Would you like me to propose a complete try-except pattern in
main()
to handle these scenarios comprehensively?
🧹 Nitpick comments (1)
dnstapir/jws.py (1)
32-48
: Refine exception handling for invalid signatures.Currently, if all keys fail, the function raises
JWKeyNotFound
, which may be misleading to callers who expect an explicit indication of an invalid signature. Consider raising a more descriptive error or providing a dedicated exception when signatures fail but the key ID is present.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
dnstapir/jws.py
(1 hunks)
🔇 Additional comments (2)
dnstapir/jws.py (2)
15-19
: Consider thread safety for_cache
inResolvedJWKSet
.If this class might be used in a concurrent environment, access to
_cache
could lead to race conditions. Consider adding a suitable thread-safe mechanism (e.g., locks or thread-safe dictionaries) to avoid potential data races.Would you like me to generate a shell script to search for multithreading usage or concurrency hints across the codebase to confirm if thread safety measures are needed?
50-81
: Add error handling for missing--nodeman
.When
args.nodeman
is omitted,urljoin(None, "/api/v1/node/{key_id}/public_key")
may produce an unusable URL or cause runtime errors. Enforcing that--nodeman
is required ensures clearer error messages and avoids obscure failures.
def get_key(self, kid: str) -> JWK: | ||
if kid in self._cache: | ||
return self._cache[kid] | ||
key = JWK.from_pyca(self.key_resolver.resolve_public_key(kid)) # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle possible exceptions during public key resolution.
self.key_resolver.resolve_public_key(kid)
may fail for network or parsing issues. Wrapping this in a try-except block could improve resilience. You can catch library-specific exceptions (e.g., network failures) and raise user-friendly errors.
+ try:
+ key_material = self.key_resolver.resolve_public_key(kid)
+ except Exception as err:
+ raise RuntimeError(f"Unable to resolve public key for kid={kid}: {err}") from err
+
- key = JWK.from_pyca(self.key_resolver.resolve_public_key(kid)) # type: ignore
+ key = JWK.from_pyca(key_material) # type: ignore
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
key = JWK.from_pyca(self.key_resolver.resolve_public_key(kid)) # type: ignore | |
try: | |
key_material = self.key_resolver.resolve_public_key(kid) | |
except Exception as err: | |
raise RuntimeError(f"Unable to resolve public key for kid={kid}: {err}") from err | |
key = JWK.from_pyca(key_material) # type: ignore |
Summary by CodeRabbit