Skip to content

Commit

Permalink
Refactor HTTP handlers in linkcheck builder tests (#11426)
Browse files Browse the repository at this point in the history
  • Loading branch information
jayaddison committed Jul 20, 2023
1 parent 1b08535 commit bef7fc2
Showing 1 changed file with 83 additions and 32 deletions.
115 changes: 83 additions & 32 deletions tests/test_build_linkcheck.py
Expand Up @@ -2,13 +2,13 @@

from __future__ import annotations

import base64
import http.server
import json
import re
import textwrap
import time
import wsgiref.handlers
from base64 import b64encode
from datetime import datetime
from os import path
from queue import Queue
Expand Down Expand Up @@ -195,16 +195,44 @@ def do_GET(self):
)


def capture_headers_handler(records):
class HeadersDumperHandler(http.server.BaseHTTPRequestHandler):
def custom_handler(valid_credentials=(), success_criteria=lambda _: True):
"""
Returns an HTTP request handler that authenticates the client and then determines
an appropriate HTTP response code, based on caller-provided credentials and optional
success criteria, respectively.
"""
expected_token = None
if valid_credentials:
assert len(valid_credentials) == 2, "expected a pair of strings as credentials"
expected_token = b64encode(":".join(valid_credentials).encode()).decode("utf-8")
del valid_credentials

class CustomHandler(http.server.BaseHTTPRequestHandler):
def authenticated(method):
def method_if_authenticated(self):
if expected_token is None:
return method(self)
elif self.headers["Authorization"] == f"Basic {expected_token}":
return method(self)
else:
self.send_response(403, "Forbidden")
self.end_headers()

return method_if_authenticated

@authenticated
def do_HEAD(self):
self.do_GET()

@authenticated
def do_GET(self):
self.send_response(200, "OK")
if success_criteria(self):
self.send_response(200, "OK")
else:
self.send_response(400, "Bad Request")
self.end_headers()
records.append(self.headers.as_string())
return HeadersDumperHandler

return CustomHandler


@pytest.mark.sphinx(
Expand All @@ -215,25 +243,29 @@ def do_GET(self):
(r'.*local.*', ('user2', 'hunter2')),
]})
def test_auth_header_uses_first_match(app):
records = []
with http_server(capture_headers_handler(records)):
with http_server(custom_handler(valid_credentials=("user1", "password"))):
app.build()

stdout = "\n".join(records)
encoded_auth = base64.b64encode(b'user1:password').decode('ascii')
assert f"Authorization: Basic {encoded_auth}\n" in stdout
with open(app.outdir / "output.json", encoding="utf-8") as fp:
content = json.load(fp)

assert content["status"] == "working"


@pytest.mark.sphinx(
'linkcheck', testroot='linkcheck-localserver', freshenv=True,
confoverrides={'linkcheck_auth': [(r'^$', ('user1', 'password'))]})
def test_auth_header_no_match(app):
records = []
with http_server(capture_headers_handler(records)):
with http_server(custom_handler(valid_credentials=("user1", "password"))):
app.build()

stdout = "\n".join(records)
assert "Authorization" not in stdout
with open(app.outdir / "output.json", encoding="utf-8") as fp:
content = json.load(fp)

# TODO: should this test's webserver return HTTP 401 here?
# https://github.com/sphinx-doc/sphinx/issues/11433
assert content["info"] == "403 Client Error: Forbidden for url: http://localhost:7777/"
assert content["status"] == "broken"


@pytest.mark.sphinx(
Expand All @@ -247,14 +279,20 @@ def test_auth_header_no_match(app):
},
}})
def test_linkcheck_request_headers(app):
records = []
with http_server(capture_headers_handler(records)):
def check_headers(self):
if "X-Secret" in self.headers:
return False
if self.headers["Accept"] != "text/html":
return False
return True

with http_server(custom_handler(success_criteria=check_headers)):
app.build()

stdout = "\n".join(records)
assert "Accept: text/html\n" in stdout
assert "X-Secret" not in stdout
assert "sesami" not in stdout
with open(app.outdir / "output.json", encoding="utf-8") as fp:
content = json.load(fp)

assert content["status"] == "working"


@pytest.mark.sphinx(
Expand All @@ -264,14 +302,20 @@ def test_linkcheck_request_headers(app):
"*": {"X-Secret": "open sesami"},
}})
def test_linkcheck_request_headers_no_slash(app):
records = []
with http_server(capture_headers_handler(records)):
def check_headers(self):
if "X-Secret" in self.headers:
return False
if self.headers["Accept"] != "application/json":
return False
return True

with http_server(custom_handler(success_criteria=check_headers)):
app.build()

stdout = "\n".join(records)
assert "Accept: application/json\n" in stdout
assert "X-Secret" not in stdout
assert "sesami" not in stdout
with open(app.outdir / "output.json", encoding="utf-8") as fp:
content = json.load(fp)

assert content["status"] == "working"


@pytest.mark.sphinx(
Expand All @@ -281,13 +325,20 @@ def test_linkcheck_request_headers_no_slash(app):
"*": {"X-Secret": "open sesami"},
}})
def test_linkcheck_request_headers_default(app):
records = []
with http_server(capture_headers_handler(records)):
def check_headers(self):
if self.headers["X-Secret"] != "open sesami":
return False
if self.headers["Accept"] == "application/json":
return False
return True

with http_server(custom_handler(success_criteria=check_headers)):
app.build()

stdout = "\n".join(records)
assert "Accepts: application/json\n" not in stdout
assert "X-Secret: open sesami\n" in stdout
with open(app.outdir / "output.json", encoding="utf-8") as fp:
content = json.load(fp)

assert content["status"] == "working"


def make_redirect_handler(*, support_head):
Expand Down

0 comments on commit bef7fc2

Please sign in to comment.