Skip to content

Commit

Permalink
do revoked check with a full cert chain when there is one
Browse files Browse the repository at this point in the history
  • Loading branch information
johanlundberg committed Dec 9, 2024
1 parent 0ae933b commit 6ba11d1
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
31 changes: 23 additions & 8 deletions src/auth_server/cert_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def cert_within_validity_period(cert: Certificate) -> bool:
return True


@lru_cache()
def cert_signed_by_ca(cert: Certificate) -> Optional[str]:
"""
check if the cert is signed by any on our loaded CA certs
Expand All @@ -68,19 +69,32 @@ def cert_signed_by_ca(cert: Certificate) -> Optional[str]:
return None


async def is_cert_revoked(cert: Certificate, ca_name: str) -> bool:
@lru_cache()
def get_chain(cert: Certificate) -> list[Certificate]:
chain = list()
# please mypy
_cert: Certificate | None = cert
while ca_name := cert_signed_by_ca(_cert):
_cert = load_ca_certs().get(ca_name)
if _cert:
chain.append(_cert)
if _cert is None or ca_name == _cert.issuer.rfc4514_string(): # no cert of signed by self
break
return chain


async def is_cert_revoked(cert: Certificate) -> bool:
"""
check if cert is revoked
"""
ca_cert = load_ca_certs().get(ca_name)
if ca_cert is None:
raise ConfigurationError(f"CA cert {ca_name} not found")
cert_fingerprint = rfc8705_fingerprint(cert)
ca_chain = get_chain(cert)
if not ca_chain:
raise ConfigurationError(f"No CA cert found for certificate {cert_fingerprint}")
try:
return is_revoked(
cert=PKIToolCertificate.from_cryptography(cert=cert), chain=Chain.from_cryptography([ca_cert])
)
return is_revoked(cert=PKIToolCertificate.from_cryptography(cert=cert), chain=Chain.from_cryptography(ca_chain))
except (PKIToolsError, ValueError) as e:
logger.error(f"Certificate {rfc8705_fingerprint(cert)} failed revoke check: {e}")
logger.error(f"Certificate {cert_fingerprint} failed revoke check: {e}")
return True


Expand Down Expand Up @@ -248,5 +262,6 @@ def serialize_certificate(cert: Certificate, encoding: Encoding = Encoding.PEM)
return public_bytes.decode("ascii")


@lru_cache()
def rfc8705_fingerprint(cert: Certificate):
return b64encode(cert.fingerprint(algorithm=SHA256())).decode("ascii")
2 changes: 1 addition & 1 deletion src/auth_server/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ async def validate_proof(self) -> Optional[GrantResponse]:
if ca_name is None:
raise StopTransactionException(status_code=401, detail="client certificate not signed by CA")

if await is_cert_revoked(cert=client_cert, ca_name=ca_name) is True:
if await is_cert_revoked(cert=client_cert) is True:
raise StopTransactionException(status_code=401, detail="client certificate revoked")

# set client CN and issuer CN in state for use in claims
Expand Down
8 changes: 4 additions & 4 deletions src/auth_server/tests/test_ca_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ def _load_cert(self, filename: str) -> Certificate:

async def test_cert_is_revoked(self):
parameters = [
("bolag_a.crt", "CN=ExpiTrust Test CA v8,O=Expisoft AB,C=SE", False),
("bolag_b.crt", "CN=ExpiTrust Test CA v8,O=Expisoft AB,C=SE", True),
("bolag_a.crt", False),
("bolag_b.crt", True),
]
for cert_name, ca_name, revoked_status in parameters:
for cert_name, revoked_status in parameters:
cert = self._load_cert(filename=cert_name)
assert await is_cert_revoked(cert, ca_name) is revoked_status, f"{cert_name} should be {not revoked_status}"
assert await is_cert_revoked(cert) is revoked_status, f"{cert_name} should be {not revoked_status}"

0 comments on commit 6ba11d1

Please sign in to comment.