Skip to content

Commit

Permalink
attestations: allow multiple attestations per dist
Browse files Browse the repository at this point in the history
Signed-off-by: Facundo Tuesca <[email protected]>
  • Loading branch information
facutuesca committed Nov 20, 2024
1 parent dd30340 commit fbfeca4
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 10 deletions.
52 changes: 48 additions & 4 deletions tests/unit/attestations/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,72 @@ def test_parse_attestations_fails_malformed_attestation(self, metrics, db_reques
in metrics.increment.calls
)

def test_parse_attestations_fails_multiple_attestations(
def test_parse_attestations_fails_multiple_attestations_exceeds_limit(
self, metrics, db_request, dummy_attestation
):
integrity_service = services.IntegrityService(
metrics=metrics,
session=db_request.db,
)

max_attestations = len(services.SUPPORTED_ATTESTATION_TYPES)

db_request.oidc_publisher = pretend.stub(attestation_identity=pretend.stub())
db_request.POST["attestations"] = TypeAdapter(list[Attestation]).dump_json(
[dummy_attestation, dummy_attestation]
[dummy_attestation] * (max_attestations + 1)
)
with pytest.raises(
AttestationUploadError,
match=f"A maximum of {max_attestations} attestations per file are "
f"supported",
):
integrity_service.parse_attestations(
db_request,
pretend.stub(),
)

assert (
pretend.call(
"warehouse.upload.attestations.failed_limit_multiple_attestations"
)
in metrics.increment.calls
)

def test_parse_attestations_fails_multiple_attestations_same_predicate(
self, metrics, monkeypatch, db_request, dummy_attestation
):
integrity_service = services.IntegrityService(
metrics=metrics,
session=db_request.db,
)
max_attestations = len(services.SUPPORTED_ATTESTATION_TYPES)
db_request.oidc_publisher = pretend.stub(
attestation_identity=pretend.stub(),
)
db_request.oidc_claims = {"sha": "somesha"}
db_request.POST["attestations"] = TypeAdapter(list[Attestation]).dump_json(
[dummy_attestation] * max_attestations
)

monkeypatch.setattr(Verifier, "production", lambda: pretend.stub())
monkeypatch.setattr(
Attestation, "verify", lambda *args: (AttestationType.PYPI_PUBLISH_V1, {})
)

with pytest.raises(
AttestationUploadError, match="Only a single attestation per file"
AttestationUploadError,
match="Multiple attestations for the same file with the same predicate "
"type: AttestationType.PYPI_PUBLISH_V1",
):
integrity_service.parse_attestations(
db_request,
pretend.stub(),
)

assert (
pretend.call("warehouse.upload.attestations.failed_multiple_attestations")
pretend.call(
"warehouse.upload.attestations.failed_duplicate_predicate_type"
)
in metrics.increment.calls
)

Expand Down
26 changes: 20 additions & 6 deletions warehouse/attestations/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,14 @@ def _extract_attestations_from_request(request: Request) -> list[Attestation]:
"Malformed attestations: an empty attestation set is not permitted"
)

# This is a temporary constraint; multiple attestations per file will
# be supported in the future.
if len(attestations) > 1:
metrics.increment("warehouse.upload.attestations.failed_multiple_attestations")

# We currently allow at most one attestation per predicate type
max_attestations = len(SUPPORTED_ATTESTATION_TYPES)
if len(attestations) > max_attestations:
metrics.increment(
"warehouse.upload.attestations.failed_limit_multiple_attestations"
)
raise AttestationUploadError(
"Only a single attestation per file is supported",
f"A maximum of {max_attestations} attestations per file are supported",
)

return attestations
Expand Down Expand Up @@ -158,6 +159,8 @@ def parse_attestations(
# Sanity-checked above.
expected_identity = request.oidc_publisher.attestation_identity

seen_predicate_types: set[str] = set()

for attestation_model in attestations:
try:
predicate_type, _ = attestation_model.verify(
Expand Down Expand Up @@ -190,6 +193,17 @@ def parse_attestations(
f"Attestation with unsupported predicate type: {predicate_type}",
)

if predicate_type in seen_predicate_types:
self.metrics.increment(
"warehouse.upload.attestations.failed_duplicate_predicate_type"
)
raise AttestationUploadError(
f"Multiple attestations for the same file with the same predicate "
f"type: {predicate_type}",
)

seen_predicate_types.add(predicate_type)

return attestations

def build_provenance(
Expand Down

0 comments on commit fbfeca4

Please sign in to comment.