diff --git a/aiohttp/_http_parser.pyx b/aiohttp/_http_parser.pyx index 34f6c8c87d6..d53be214fa5 100644 --- a/aiohttp/_http_parser.pyx +++ b/aiohttp/_http_parser.pyx @@ -117,6 +117,8 @@ cdef class HttpParser: self._csettings.on_body = cb_on_body self._csettings.on_message_begin = cb_on_message_begin self._csettings.on_message_complete = cb_on_message_complete + self._csettings.on_chunk_header = cb_on_chunk_header + self._csettings.on_chunk_complete = cb_on_chunk_complete self._last_error = None @@ -208,6 +210,11 @@ cdef class HttpParser: self._payload.feed_eof() self._payload = None + cdef _on_chunk_header(self): + self._payload.begin_http_chunk_receiving() + + cdef _on_chunk_complete(self): + self._payload.end_http_chunk_receiving() ### Public API ### @@ -436,6 +443,28 @@ cdef int cb_on_message_complete(cparser.http_parser* parser) except -1: return 0 +cdef int cb_on_chunk_header(cparser.http_parser* parser) except -1: + cdef HttpParser pyparser = parser.data + try: + pyparser._on_chunk_header() + except BaseException as exc: + pyparser._last_error = exc + return -1 + else: + return 0 + + +cdef int cb_on_chunk_complete(cparser.http_parser* parser) except -1: + cdef HttpParser pyparser = parser.data + try: + pyparser._on_chunk_complete() + except BaseException as exc: + pyparser._last_error = exc + return -1 + else: + return 0 + + cdef parser_error_from_errno(cparser.http_errno errno): cdef bytes desc = cparser.http_errno_description(errno) diff --git a/aiohttp/http_parser.py b/aiohttp/http_parser.py index eb6f0fd141a..575baee203d 100644 --- a/aiohttp/http_parser.py +++ b/aiohttp/http_parser.py @@ -538,6 +538,7 @@ def feed_data(self, chunk, SEP=b'\r\n', CHUNK_EXT=b';'): else: self._chunk = ChunkState.PARSE_CHUNKED_CHUNK self._chunk_size = size + self.payload.begin_http_chunk_receiving() else: self._chunk_tail = chunk return False, None @@ -547,11 +548,8 @@ def feed_data(self, chunk, SEP=b'\r\n', CHUNK_EXT=b';'): required = self._chunk_size chunk_len = len(chunk) - if required >= chunk_len: + if required > chunk_len: self._chunk_size = required - chunk_len - if self._chunk_size == 0: - self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF - self.payload.feed_data(chunk, chunk_len) return False, None else: @@ -559,6 +557,7 @@ def feed_data(self, chunk, SEP=b'\r\n', CHUNK_EXT=b';'): self.payload.feed_data(chunk[:required], required) chunk = chunk[required:] self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF + self.payload.end_http_chunk_receiving() # toss the CRLF at the end of the chunk if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF: @@ -644,6 +643,12 @@ def feed_eof(self): self.out.feed_eof() + def begin_http_chunk_receiving(self): + self.out.begin_http_chunk_receiving() + + def end_http_chunk_receiving(self): + self.out.end_http_chunk_receiving() + HttpRequestParser = HttpRequestParserPy HttpResponseParser = HttpResponseParserPy diff --git a/aiohttp/streams.py b/aiohttp/streams.py index beb6a9d118c..3604bc96fbe 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -40,6 +40,14 @@ def __anext__(self): raise StopAsyncIteration # NOQA return rv + class ChunkTupleAsyncStreamIterator(AsyncStreamIterator): + @asyncio.coroutine + def __anext__(self): + rv = yield from self.read_func() + if rv == (b'', False): + raise StopAsyncIteration # NOQA + return rv + class AsyncStreamReaderMixin: @@ -58,20 +66,21 @@ def iter_chunked(self, n): return AsyncStreamIterator(lambda: self.read(n)) def iter_any(self): - """Returns an asynchronous iterator that yields slices of data - as they come. + """Returns an asynchronous iterator that yields all the available + data as soon as it is received Python-3.5 available for Python 3.5+ only """ return AsyncStreamIterator(self.readany) def iter_chunks(self): - """Returns an asynchronous iterator that yields chunks of the - size as received by the server. + """Returns an asynchronous iterator that yields chunks of data + as they are received by the server. The yielded objects are tuples + of (bytes, bool) as returned by the StreamReader.readchunk method. Python-3.5 available for Python 3.5+ only """ - return AsyncStreamIterator(self.readchunk) + return ChunkTupleAsyncStreamIterator(self.readchunk) class StreamReader(AsyncStreamReaderMixin): @@ -96,6 +105,8 @@ def __init__(self, limit=DEFAULT_LIMIT, timer=None, loop=None): loop = asyncio.get_event_loop() self._loop = loop self._size = 0 + self._cursor = 0 + self._http_chunk_splits = None self._buffer = collections.deque() self._buffer_offset = 0 self._eof = False @@ -200,6 +211,7 @@ def unread_data(self, data): self._buffer[0] = self._buffer[0][self._buffer_offset:] self._buffer_offset = 0 self._size += len(data) + self._cursor -= len(data) self._buffer.appendleft(data) def feed_data(self, data): @@ -218,6 +230,18 @@ def feed_data(self, data): if not waiter.done(): waiter.set_result(False) + def begin_http_chunk_receiving(self): + if self._http_chunk_splits is None: + self._http_chunk_splits = [] + + def end_http_chunk_receiving(self): + if self._http_chunk_splits is None: + raise RuntimeError("Called end_chunk_receiving without calling " + "begin_chunk_receiving first") + if not self._http_chunk_splits or \ + self._http_chunk_splits[-1] != self.total_bytes: + self._http_chunk_splits.append(self.total_bytes) + @asyncio.coroutine def _wait(self, func_name): # StreamReader uses a future to link the protocol feed_data() method @@ -320,16 +344,34 @@ def readany(self): @asyncio.coroutine def readchunk(self): + """Returns a tuple of (data, end_of_http_chunk). When chunked transfer + encoding is used, end_of_http_chunk is a boolean indicating if the end + of the data corresponds to the end of a HTTP chunk , otherwise it is + always False. + """ if self._exception is not None: raise self._exception if not self._buffer and not self._eof: + if (self._http_chunk_splits and + self._cursor == self._http_chunk_splits[0]): + # end of http chunk without available data + self._http_chunk_splits = self._http_chunk_splits[1:] + return (b"", True) yield from self._wait('readchunk') - if self._buffer: - return self._read_nowait_chunk(-1) + if not self._buffer: + # end of file + return (b"", False) + elif self._http_chunk_splits is not None: + while self._http_chunk_splits: + pos = self._http_chunk_splits[0] + self._http_chunk_splits = self._http_chunk_splits[1:] + if pos > self._cursor: + return (self._read_nowait(pos-self._cursor), True) + return (self._read_nowait(-1), False) else: - return b"" + return (self._read_nowait_chunk(-1), False) @asyncio.coroutine def readexactly(self, n): @@ -378,6 +420,7 @@ def _read_nowait_chunk(self, n): data = self._buffer.popleft() self._size -= len(data) + self._cursor += len(data) return data def _read_nowait(self, n): @@ -438,7 +481,7 @@ def readany(self): @asyncio.coroutine def readchunk(self): - return b'' + return (b'', False) @asyncio.coroutine def readexactly(self, n): diff --git a/changes/2150.feature b/changes/2150.feature new file mode 100644 index 00000000000..92ae2c47b4e --- /dev/null +++ b/changes/2150.feature @@ -0,0 +1 @@ +Make the HTTP client able to return HTTP chunks when chunked transfer encoding is used. diff --git a/docs/client_reference.rst b/docs/client_reference.rst index 12c44d1efec..00d60c9ee82 100644 --- a/docs/client_reference.rst +++ b/docs/client_reference.rst @@ -991,7 +991,10 @@ Response object .. attribute:: content - Payload stream, contains response's BODY (:class:`StreamReader`). + Payload stream, which contains response's BODY (:class:`StreamReader`). + It supports various reading methods depending on the expected format. + When chunked transfer encoding is used by the server, allows retrieving + the actual http chunks. Reading from the stream may raise :exc:`aiohttp.ClientPayloadError` if the response object is diff --git a/docs/streams.rst b/docs/streams.rst index 111b661834f..e99d3ef51b1 100644 --- a/docs/streams.rst +++ b/docs/streams.rst @@ -74,6 +74,21 @@ Reading Methods :return bytes: the given line +.. comethod:: StreamReader.readchunk() + + Read a chunk of data as it was received by the server. + + Returns a tuple of (data, end_of_HTTP_chunk). + + When chunked transfer encoding is used, end_of_HTTP_chunk is a :class:`bool` + indicating if the end of the data corresponds to the end of a HTTP chunk, + otherwise it is always ``False``. + + :return tuple[bytes, bool]: a chunk of data and a :class:`bool` that is ``True`` + when the end of the returned chunk corresponds + to the end of a HTTP chunk. + + Asynchronous Iteration Support ------------------------------ @@ -109,9 +124,20 @@ size limit and over any available data. Iterates over data chunks as received from the server:: - async for data in response.content.iter_chunks(): + async for data, _ in response.content.iter_chunks(): print(data) + If chunked transfer encoding is used, the original http chunks formatting + can be retrieved by reading the second element of returned tuples:: + + buffer = b"" + + async for data, end_of_http_chunk in response.content.iter_chunks(): + buffer += data + if end_of_http_chunk: + print(buffer) + buffer = b"" + Helpers ------- diff --git a/tests/test_flowcontrol_streams.py b/tests/test_flowcontrol_streams.py index 25226727fc0..483ed9dac13 100644 --- a/tests/test_flowcontrol_streams.py +++ b/tests/test_flowcontrol_streams.py @@ -71,16 +71,18 @@ def test_readany_resume_paused(self): def test_readchunk(self): r = self._make_one() r.feed_data(b'data', 4) - res = self.loop.run_until_complete(r.readchunk()) + res, end_of_http_chunk = self.loop.run_until_complete(r.readchunk()) self.assertEqual(res, b'data') + self.assertFalse(end_of_http_chunk) self.assertFalse(r._protocol.resume_reading.called) def test_readchunk_resume_paused(self): r = self._make_one() r._protocol._reading_paused = True r.feed_data(b'data', 4) - res = self.loop.run_until_complete(r.readchunk()) + res, end_of_http_chunk = self.loop.run_until_complete(r.readchunk()) self.assertEqual(res, b'data') + self.assertFalse(end_of_http_chunk) self.assertTrue(r._protocol.resume_reading.called) def test_readexactly(self): diff --git a/tests/test_http_parser.py b/tests/test_http_parser.py index 3d677bde884..ac2011ed2ea 100644 --- a/tests/test_http_parser.py +++ b/tests/test_http_parser.py @@ -488,6 +488,7 @@ def test_http_request_chunked_payload(parser): parser.feed_data(b'4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n') assert b'dataline' == b''.join(d for d in payload._buffer) + assert [4, 8] == payload._http_chunk_splits assert payload.is_eof() @@ -502,6 +503,7 @@ def test_http_request_chunked_payload_and_next_message(parser): b'transfer-encoding: chunked\r\n\r\n') assert b'dataline' == b''.join(d for d in payload._buffer) + assert [4, 8] == payload._http_chunk_splits assert payload.is_eof() assert len(messages) == 1 @@ -521,14 +523,17 @@ def test_http_request_chunked_payload_chunks(parser): parser.feed_data(b'\n4') parser.feed_data(b'\r') parser.feed_data(b'\n') - parser.feed_data(b'line\r\n0\r\n') + parser.feed_data(b'li') + parser.feed_data(b'ne\r\n0\r\n') parser.feed_data(b'test: test\r\n') assert b'dataline' == b''.join(d for d in payload._buffer) + assert [4, 8] == payload._http_chunk_splits assert not payload.is_eof() parser.feed_data(b'\r\n') assert b'dataline' == b''.join(d for d in payload._buffer) + assert [4, 8] == payload._http_chunk_splits assert payload.is_eof() @@ -541,6 +546,7 @@ def test_parse_chunked_payload_chunk_extension(parser): b'4;test\r\ndata\r\n4\r\nline\r\n0\r\ntest: test\r\n\r\n') assert b'dataline' == b''.join(d for d in payload._buffer) + assert [4, 8] == payload._http_chunk_splits assert payload.is_eof() diff --git a/tests/test_py35/test_streams_35.py b/tests/test_py35/test_streams_35.py index 1644e230708..4798d90a681 100644 --- a/tests/test_py35/test_streams_35.py +++ b/tests/test_py35/test_streams_35.py @@ -81,3 +81,24 @@ async def test_stream_reader_iter(loop): async for raw in create_stream(loop): assert raw == next(it) pytest.raises(StopIteration, next, it) + + +async def test_stream_reader_iter_chunks_no_chunked_encoding(loop): + it = iter([b'line1\nline2\nline3\n']) + async for data, end_of_chunk in create_stream(loop).iter_chunks(): + assert (data, end_of_chunk) == (next(it), False) + pytest.raises(StopIteration, next, it) + + +async def test_stream_reader_iter_chunks_chunked_encoding(loop): + stream = streams.StreamReader(loop=loop) + for line in DATA.splitlines(keepends=True): + stream.begin_http_chunk_receiving() + stream.feed_data(line) + stream.end_http_chunk_receiving() + stream.feed_eof() + + it = iter([b'line1\n', b'line2\n', b'line3\n']) + async for data, end_of_chunk in stream.iter_chunks(): + assert (data, end_of_chunk) == (next(it), True) + pytest.raises(StopIteration, next, it) diff --git a/tests/test_streams.py b/tests/test_streams.py index 31bea1f1a84..d233b3e4564 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -555,14 +555,17 @@ def cb(): stream.feed_eof() self.loop.call_soon(cb) - data = self.loop.run_until_complete(stream.readchunk()) + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) self.assertEqual(b'chunk1', data) + self.assertFalse(end_of_chunk) - data = self.loop.run_until_complete(stream.readchunk()) + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) self.assertEqual(b'chunk2', data) + self.assertFalse(end_of_chunk) - data = self.loop.run_until_complete(stream.readchunk()) + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) self.assertEqual(b'', data) + self.assertFalse(end_of_chunk) def test_readchunk_wait_eof(self): stream = self._make_one() @@ -572,10 +575,109 @@ def cb(): stream.feed_eof() asyncio.Task(cb(), loop=self.loop) - data = self.loop.run_until_complete(stream.readchunk()) + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) self.assertEqual(b"", data) + self.assertFalse(end_of_chunk) self.assertTrue(stream.is_eof()) + def test_begin_and_end_chunk_receiving(self): + stream = self._make_one() + + stream.begin_http_chunk_receiving() + stream.feed_data(b'part1') + stream.feed_data(b'part2') + stream.end_http_chunk_receiving() + + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) + self.assertEqual(b'part1part2', data) + self.assertTrue(end_of_chunk) + + stream.begin_http_chunk_receiving() + stream.feed_data(b'part3') + + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) + self.assertEqual(b'part3', data) + self.assertFalse(end_of_chunk) + + stream.end_http_chunk_receiving() + + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) + self.assertEqual(b'', data) + self.assertTrue(end_of_chunk) + + stream.feed_eof() + + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) + self.assertEqual(b'', data) + self.assertFalse(end_of_chunk) + + def test_end_chunk_receiving_without_begin(self): + stream = self._make_one() + self.assertRaises(RuntimeError, stream.end_http_chunk_receiving) + + def test_readchunk_with_unread(self): + """Test that stream.unread does not break controlled chunk receiving. + """ + stream = self._make_one() + + # Send 2 chunks + stream.begin_http_chunk_receiving() + stream.feed_data(b'part1') + stream.end_http_chunk_receiving() + stream.begin_http_chunk_receiving() + stream.feed_data(b'part2') + stream.end_http_chunk_receiving() + + # Read only one chunk + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) + + # Try to unread a part of the first chunk + stream.unread_data(b'rt1') + + # The end_of_chunk signal was already received for the first chunk, + # so we receive up to the second one + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) + self.assertEqual(b'rt1part2', data) + self.assertTrue(end_of_chunk) + + # Unread a part of the second chunk + stream.unread_data(b'rt2') + + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) + self.assertEqual(b'rt2', data) + # end_of_chunk was already received for this chunk + self.assertFalse(end_of_chunk) + + stream.feed_eof() + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) + self.assertEqual(b'', data) + self.assertFalse(end_of_chunk) + + def test_readchunk_with_other_read_calls(self): + """Test that stream.readchunk works when other read calls are made on + the stream. + """ + stream = self._make_one() + + stream.begin_http_chunk_receiving() + stream.feed_data(b'part1') + stream.end_http_chunk_receiving() + stream.begin_http_chunk_receiving() + stream.feed_data(b'part2') + stream.end_http_chunk_receiving() + + data = self.loop.run_until_complete(stream.read(7)) + self.assertEqual(b'part1pa', data) + + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) + self.assertEqual(b'rt2', data) + self.assertTrue(end_of_chunk) + + stream.feed_eof() + data, end_of_chunk = self.loop.run_until_complete(stream.readchunk()) + self.assertEqual(b'', data) + self.assertFalse(end_of_chunk) + def test___repr__(self): stream = self._make_one() self.assertEqual("", repr(stream)) @@ -647,7 +749,7 @@ def test_empty_stream_reader(self): self.assertEqual( self.loop.run_until_complete(s.readany()), b'') self.assertEqual( - self.loop.run_until_complete(s.readchunk()), b'') + self.loop.run_until_complete(s.readchunk()), (b'', False)) self.assertRaises( asyncio.IncompleteReadError, self.loop.run_until_complete, s.readexactly(10))