diff --git a/tests/test_build_linkcheck.py b/tests/test_build_linkcheck.py index d7c925e7c28..7bc4703d673 100644 --- a/tests/test_build_linkcheck.py +++ b/tests/test_build_linkcheck.py @@ -2,13 +2,13 @@ from __future__ import annotations -import base64 import http.server import json import re import textwrap import time import wsgiref.handlers +from base64 import b64encode from datetime import datetime from os import path from queue import Queue @@ -195,16 +195,44 @@ def do_GET(self): ) -def capture_headers_handler(records): - class HeadersDumperHandler(http.server.BaseHTTPRequestHandler): +def custom_handler(valid_credentials=(), success_criteria=lambda _: True): + """ + Returns an HTTP request handler that authenticates the client and then determines + an appropriate HTTP response code, based on caller-provided credentials and optional + success criteria, respectively. + """ + expected_token = None + if valid_credentials: + assert len(valid_credentials) == 2, "expected a pair of strings as credentials" + expected_token = b64encode(":".join(valid_credentials).encode()).decode("utf-8") + del valid_credentials + + class CustomHandler(http.server.BaseHTTPRequestHandler): + def authenticated(method): + def method_if_authenticated(self): + if expected_token is None: + return method(self) + elif self.headers["Authorization"] == f"Basic {expected_token}": + return method(self) + else: + self.send_response(403, "Forbidden") + self.end_headers() + + return method_if_authenticated + + @authenticated def do_HEAD(self): self.do_GET() + @authenticated def do_GET(self): - self.send_response(200, "OK") + if success_criteria(self): + self.send_response(200, "OK") + else: + self.send_response(400, "Bad Request") self.end_headers() - records.append(self.headers.as_string()) - return HeadersDumperHandler + + return CustomHandler @pytest.mark.sphinx( @@ -215,25 +243,29 @@ def do_GET(self): (r'.*local.*', ('user2', 'hunter2')), ]}) def test_auth_header_uses_first_match(app): - records = [] - with http_server(capture_headers_handler(records)): + with http_server(custom_handler(valid_credentials=("user1", "password"))): app.build() - stdout = "\n".join(records) - encoded_auth = base64.b64encode(b'user1:password').decode('ascii') - assert f"Authorization: Basic {encoded_auth}\n" in stdout + with open(app.outdir / "output.json", encoding="utf-8") as fp: + content = json.load(fp) + + assert content["status"] == "working" @pytest.mark.sphinx( 'linkcheck', testroot='linkcheck-localserver', freshenv=True, confoverrides={'linkcheck_auth': [(r'^$', ('user1', 'password'))]}) def test_auth_header_no_match(app): - records = [] - with http_server(capture_headers_handler(records)): + with http_server(custom_handler(valid_credentials=("user1", "password"))): app.build() - stdout = "\n".join(records) - assert "Authorization" not in stdout + with open(app.outdir / "output.json", encoding="utf-8") as fp: + content = json.load(fp) + + # TODO: should this test's webserver return HTTP 401 here? + # https://github.com/sphinx-doc/sphinx/issues/11433 + assert content["info"] == "403 Client Error: Forbidden for url: http://localhost:7777/" + assert content["status"] == "broken" @pytest.mark.sphinx( @@ -247,14 +279,20 @@ def test_auth_header_no_match(app): }, }}) def test_linkcheck_request_headers(app): - records = [] - with http_server(capture_headers_handler(records)): + def check_headers(self): + if "X-Secret" in self.headers: + return False + if self.headers["Accept"] != "text/html": + return False + return True + + with http_server(custom_handler(success_criteria=check_headers)): app.build() - stdout = "\n".join(records) - assert "Accept: text/html\n" in stdout - assert "X-Secret" not in stdout - assert "sesami" not in stdout + with open(app.outdir / "output.json", encoding="utf-8") as fp: + content = json.load(fp) + + assert content["status"] == "working" @pytest.mark.sphinx( @@ -264,14 +302,20 @@ def test_linkcheck_request_headers(app): "*": {"X-Secret": "open sesami"}, }}) def test_linkcheck_request_headers_no_slash(app): - records = [] - with http_server(capture_headers_handler(records)): + def check_headers(self): + if "X-Secret" in self.headers: + return False + if self.headers["Accept"] != "application/json": + return False + return True + + with http_server(custom_handler(success_criteria=check_headers)): app.build() - stdout = "\n".join(records) - assert "Accept: application/json\n" in stdout - assert "X-Secret" not in stdout - assert "sesami" not in stdout + with open(app.outdir / "output.json", encoding="utf-8") as fp: + content = json.load(fp) + + assert content["status"] == "working" @pytest.mark.sphinx( @@ -281,13 +325,20 @@ def test_linkcheck_request_headers_no_slash(app): "*": {"X-Secret": "open sesami"}, }}) def test_linkcheck_request_headers_default(app): - records = [] - with http_server(capture_headers_handler(records)): + def check_headers(self): + if self.headers["X-Secret"] != "open sesami": + return False + if self.headers["Accept"] == "application/json": + return False + return True + + with http_server(custom_handler(success_criteria=check_headers)): app.build() - stdout = "\n".join(records) - assert "Accepts: application/json\n" not in stdout - assert "X-Secret: open sesami\n" in stdout + with open(app.outdir / "output.json", encoding="utf-8") as fp: + content = json.load(fp) + + assert content["status"] == "working" def make_redirect_handler(*, support_head):