Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RFC 2069 mode digest authentication #3045

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Fixed

* Respect the `http1` argument while configuring proxy transports. (#3023)
* Fix RFC 2069 mode digest authentication. (#3045)

## 0.26.0 (20th December, 2023)

Expand Down
7 changes: 4 additions & 3 deletions httpx/_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,18 @@ def digest(data: bytes) -> bytes:

qop = self._resolve_qop(challenge.qop, request=request)
if qop is None:
# Following RFC 2069
digest_data = [HA1, challenge.nonce, HA2]
else:
digest_data = [challenge.nonce, nc_value, cnonce, qop, HA2]
key_digest = b":".join(digest_data)
# Following RFC 2617/7616
digest_data = [HA1, challenge.nonce, nc_value, cnonce, qop, HA2]

format_args = {
"username": self._username,
"realm": challenge.realm,
"nonce": challenge.nonce,
"uri": path,
"response": digest(b":".join((HA1, key_digest))),
"response": digest(b":".join(digest_data)),
"algorithm": challenge.algorithm.encode(),
}
if challenge.opaque:
Expand Down
165 changes: 165 additions & 0 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,168 @@ def test_digest_auth_setting_cookie_in_request():
)
with pytest.raises(StopIteration):
flow.send(response)


def test_digest_auth_rfc_2069():
# Example from https://datatracker.ietf.org/doc/html/rfc2069#section-2.4
# with corrected response from https://www.rfc-editor.org/errata/eid749

auth = httpx.DigestAuth(username="Mufasa", password="CircleOfLife")
request = httpx.Request("GET", "https://www.example.com/dir/index.html")

# The initial request should not include an auth header.
flow = auth.sync_auth_flow(request)
request = next(flow)
assert "Authorization" not in request.headers

# If a 401 response is returned, then a digest auth request is made.
headers = {
"WWW-Authenticate": (
'Digest realm="testrealm@host.com", '
'nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", '
'opaque="5ccc069c403ebaf9f0171e9517f40e41"'
)
}
response = httpx.Response(
content=b"Auth required", status_code=401, headers=headers, request=request
)
request = flow.send(response)
assert request.headers["Authorization"].startswith("Digest")
assert 'username="Mufasa"' in request.headers["Authorization"]
assert 'realm="testrealm@host.com"' in request.headers["Authorization"]
assert (
'nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093"' in request.headers["Authorization"]
)
assert 'uri="/dir/index.html"' in request.headers["Authorization"]
assert (
'opaque="5ccc069c403ebaf9f0171e9517f40e41"' in request.headers["Authorization"]
)
assert (
'response="1949323746fe6a43ef61f9606e7febea"'
in request.headers["Authorization"]
)

# No other requests are made.
response = httpx.Response(content=b"Hello, world!", status_code=200)
with pytest.raises(StopIteration):
flow.send(response)


def test_digest_auth_rfc_7616_md5(monkeypatch):
# Example from https://datatracker.ietf.org/doc/html/rfc7616#section-3.9.1

def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
return "f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ".encode()

auth = httpx.DigestAuth(username="Mufasa", password="Circle of Life")
monkeypatch.setattr(auth, "_get_client_nonce", mock_get_client_nonce)

request = httpx.Request("GET", "https://www.example.com/dir/index.html")

# The initial request should not include an auth header.
flow = auth.sync_auth_flow(request)
request = next(flow)
assert "Authorization" not in request.headers

# If a 401 response is returned, then a digest auth request is made.
headers = {
"WWW-Authenticate": (
'Digest realm="http-auth@example.org", '
'qop="auth, auth-int", '
"algorithm=MD5, "
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", '
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
)
}
response = httpx.Response(
content=b"Auth required", status_code=401, headers=headers, request=request
)
request = flow.send(response)
assert request.headers["Authorization"].startswith("Digest")
assert 'username="Mufasa"' in request.headers["Authorization"]
assert 'realm="http-auth@example.org"' in request.headers["Authorization"]
assert 'uri="/dir/index.html"' in request.headers["Authorization"]
assert "algorithm=MD5" in request.headers["Authorization"]
assert (
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v"'
in request.headers["Authorization"]
)
assert "nc=00000001" in request.headers["Authorization"]
assert (
'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"'
in request.headers["Authorization"]
)
assert "qop=auth" in request.headers["Authorization"]
assert (
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
in request.headers["Authorization"]
)
assert (
'response="8ca523f5e9506fed4657c9700eebdbec"'
in request.headers["Authorization"]
)
tomchristie marked this conversation as resolved.
Show resolved Hide resolved

# No other requests are made.
response = httpx.Response(content=b"Hello, world!", status_code=200)
with pytest.raises(StopIteration):
flow.send(response)


def test_digest_auth_rfc_7616_sha_256(monkeypatch):
# Example from https://datatracker.ietf.org/doc/html/rfc7616#section-3.9.1

def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
return "f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ".encode()

auth = httpx.DigestAuth(username="Mufasa", password="Circle of Life")
monkeypatch.setattr(auth, "_get_client_nonce", mock_get_client_nonce)

request = httpx.Request("GET", "https://www.example.com/dir/index.html")

# The initial request should not include an auth header.
flow = auth.sync_auth_flow(request)
request = next(flow)
assert "Authorization" not in request.headers

# If a 401 response is returned, then a digest auth request is made.
headers = {
"WWW-Authenticate": (
'Digest realm="http-auth@example.org", '
'qop="auth, auth-int", '
"algorithm=SHA-256, "
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", '
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
)
}
response = httpx.Response(
content=b"Auth required", status_code=401, headers=headers, request=request
)
request = flow.send(response)
assert request.headers["Authorization"].startswith("Digest")
assert 'username="Mufasa"' in request.headers["Authorization"]
assert 'realm="http-auth@example.org"' in request.headers["Authorization"]
assert 'uri="/dir/index.html"' in request.headers["Authorization"]
assert "algorithm=SHA-256" in request.headers["Authorization"]
assert (
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v"'
in request.headers["Authorization"]
)
assert "nc=00000001" in request.headers["Authorization"]
assert (
'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"'
in request.headers["Authorization"]
)
assert "qop=auth" in request.headers["Authorization"]
assert (
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
in request.headers["Authorization"]
)
assert (
'response="753927fa0e85d155564e2e272a28d1802ca10daf4496794697cf8db5856cb6c1"'
in request.headers["Authorization"]
)

# No other requests are made.
response = httpx.Response(content=b"Hello, world!", status_code=200)
with pytest.raises(StopIteration):
flow.send(response)