Skip to content

Commit 2f2f8d5

Browse files
daeyeonRafaelGSS
authored andcommittedSep 26, 2022
stream: add ReadableByteStream.tee()
This supports teeing readable byte streams to meet the latest web streams standards. Signed-off-by: Daeyeon Jeong daeyeon.dev@gmail.com PR-URL: #44505 Refs: https://streams.spec.whatwg.org/#readable-stream-tee Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Minwoo Jung <nodecorelab@gmail.com> Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
1 parent a3360b1 commit 2f2f8d5

File tree

6 files changed

+343
-53
lines changed

6 files changed

+343
-53
lines changed
 

‎doc/api/webstreams.md

+4
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,10 @@ is active.
299299

300300
<!-- YAML
301301
added: v16.5.0
302+
changes:
303+
- version: REPLACEME
304+
pr-url: https://github.com/nodejs/node/pull/44505
305+
description: Support teeing a readable byte stream.
302306
-->
303307

304308
* Returns: {ReadableStream\[]}

‎lib/internal/webstreams/readablestream.js

+293-11
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ const {
9595
ArrayBufferViewGetByteOffset,
9696
ArrayBufferGetByteLength,
9797
AsyncIterator,
98+
cloneAsUint8Array,
9899
copyArrayBuffer,
99100
customInspect,
100101
dequeueValue,
@@ -215,6 +216,7 @@ class ReadableStream {
215216
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
216217
this[kState] = {
217218
disturbed: false,
219+
reader: undefined,
218220
state: 'readable',
219221
storedError: undefined,
220222
stream: undefined,
@@ -1111,7 +1113,6 @@ class ReadableByteStreamController {
11111113
chunk);
11121114
}
11131115
const chunkByteLength = ArrayBufferViewGetByteLength(chunk);
1114-
const chunkByteOffset = ArrayBufferViewGetByteOffset(chunk);
11151116
const chunkBuffer = ArrayBufferViewGetBuffer(chunk);
11161117
const chunkBufferByteLength = ArrayBufferGetByteLength(chunkBuffer);
11171118
if (chunkByteLength === 0 || chunkBufferByteLength === 0) {
@@ -1122,11 +1123,7 @@ class ReadableByteStreamController {
11221123
throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
11231124
if (this[kState].stream[kState].state !== 'readable')
11241125
throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed');
1125-
readableByteStreamControllerEnqueue(
1126-
this,
1127-
chunkBuffer,
1128-
chunkByteLength,
1129-
chunkByteOffset);
1126+
readableByteStreamControllerEnqueue(this, chunk);
11301127
}
11311128

11321129
/**
@@ -1430,6 +1427,13 @@ function readableStreamPipeTo(
14301427
}
14311428

14321429
function readableStreamTee(stream, cloneForBranch2) {
1430+
if (isReadableByteStreamController(stream[kState].controller)) {
1431+
return readableByteStreamTee(stream);
1432+
}
1433+
return readableStreamDefaultTee(stream, cloneForBranch2);
1434+
}
1435+
1436+
function readableStreamDefaultTee(stream, cloneForBranch2) {
14331437
const reader = new ReadableStreamDefaultReader(stream);
14341438
let reading = false;
14351439
let canceled1 = false;
@@ -1524,6 +1528,284 @@ function readableStreamTee(stream, cloneForBranch2) {
15241528
return [branch1, branch2];
15251529
}
15261530

1531+
function readableByteStreamTee(stream) {
1532+
assert(isReadableStream(stream));
1533+
assert(isReadableByteStreamController(stream[kState].controller));
1534+
1535+
let reader = new ReadableStreamDefaultReader(stream);
1536+
let reading = false;
1537+
let readAgainForBranch1 = false;
1538+
let readAgainForBranch2 = false;
1539+
let canceled1 = false;
1540+
let canceled2 = false;
1541+
let reason1;
1542+
let reason2;
1543+
let branch1;
1544+
let branch2;
1545+
const cancelDeferred = createDeferredPromise();
1546+
1547+
function forwardReaderError(thisReader) {
1548+
PromisePrototypeThen(
1549+
thisReader[kState].close.promise,
1550+
undefined,
1551+
(error) => {
1552+
if (thisReader !== reader) {
1553+
return;
1554+
}
1555+
readableStreamDefaultControllerError(branch1[kState].controller, error);
1556+
readableStreamDefaultControllerError(branch2[kState].controller, error);
1557+
if (!canceled1 || !canceled2) {
1558+
cancelDeferred.resolve();
1559+
}
1560+
}
1561+
);
1562+
}
1563+
1564+
function pullWithDefaultReader() {
1565+
if (isReadableStreamBYOBReader(reader)) {
1566+
readableStreamBYOBReaderRelease(reader);
1567+
reader = new ReadableStreamDefaultReader(stream);
1568+
forwardReaderError(reader);
1569+
}
1570+
1571+
const readRequest = {
1572+
[kChunk](chunk) {
1573+
queueMicrotask(() => {
1574+
readAgainForBranch1 = false;
1575+
readAgainForBranch2 = false;
1576+
const chunk1 = chunk;
1577+
let chunk2 = chunk;
1578+
1579+
if (!canceled1 && !canceled2) {
1580+
try {
1581+
chunk2 = cloneAsUint8Array(chunk);
1582+
} catch (error) {
1583+
readableByteStreamControllerError(
1584+
branch1[kState].controller,
1585+
error
1586+
);
1587+
readableByteStreamControllerError(
1588+
branch2[kState].controller,
1589+
error
1590+
);
1591+
cancelDeferred.resolve(readableStreamCancel(stream, error));
1592+
return;
1593+
}
1594+
}
1595+
if (!canceled1) {
1596+
readableByteStreamControllerEnqueue(
1597+
branch1[kState].controller,
1598+
chunk1
1599+
);
1600+
}
1601+
if (!canceled2) {
1602+
readableByteStreamControllerEnqueue(
1603+
branch2[kState].controller,
1604+
chunk2
1605+
);
1606+
}
1607+
reading = false;
1608+
1609+
if (readAgainForBranch1) {
1610+
pull1Algorithm();
1611+
} else if (readAgainForBranch2) {
1612+
pull2Algorithm();
1613+
}
1614+
});
1615+
},
1616+
[kClose]() {
1617+
reading = false;
1618+
1619+
if (!canceled1) {
1620+
readableByteStreamControllerClose(branch1[kState].controller);
1621+
}
1622+
if (!canceled2) {
1623+
readableByteStreamControllerClose(branch2[kState].controller);
1624+
}
1625+
if (branch1[kState].controller[kState].pendingPullIntos.length > 0) {
1626+
readableByteStreamControllerRespond(branch1[kState].controller, 0);
1627+
}
1628+
if (branch2[kState].controller[kState].pendingPullIntos.length > 0) {
1629+
readableByteStreamControllerRespond(branch2[kState].controller, 0);
1630+
}
1631+
if (!canceled1 || !canceled2) {
1632+
cancelDeferred.resolve();
1633+
}
1634+
},
1635+
[kError]() {
1636+
reading = false;
1637+
},
1638+
};
1639+
1640+
readableStreamDefaultReaderRead(reader, readRequest);
1641+
}
1642+
1643+
function pullWithBYOBReader(view, forBranch2) {
1644+
if (isReadableStreamDefaultReader(reader)) {
1645+
readableStreamDefaultReaderRelease(reader);
1646+
reader = new ReadableStreamBYOBReader(stream);
1647+
forwardReaderError(reader);
1648+
}
1649+
1650+
const byobBranch = forBranch2 === true ? branch2 : branch1;
1651+
const otherBranch = forBranch2 === false ? branch2 : branch1;
1652+
const readIntoRequest = {
1653+
[kChunk](chunk) {
1654+
queueMicrotask(() => {
1655+
readAgainForBranch1 = false;
1656+
readAgainForBranch2 = false;
1657+
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1658+
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1659+
1660+
if (!otherCanceled) {
1661+
let clonedChunk;
1662+
1663+
try {
1664+
clonedChunk = cloneAsUint8Array(chunk);
1665+
} catch (error) {
1666+
readableByteStreamControllerError(
1667+
byobBranch[kState].controller,
1668+
error
1669+
);
1670+
readableByteStreamControllerError(
1671+
otherBranch[kState].controller,
1672+
error
1673+
);
1674+
cancelDeferred.resolve(readableStreamCancel(stream, error));
1675+
return;
1676+
}
1677+
if (!byobCanceled) {
1678+
readableByteStreamControllerRespondWithNewView(
1679+
byobBranch[kState].controller,
1680+
chunk
1681+
);
1682+
}
1683+
1684+
readableByteStreamControllerEnqueue(
1685+
otherBranch[kState].controller,
1686+
clonedChunk
1687+
);
1688+
} else if (!byobCanceled) {
1689+
readableByteStreamControllerRespondWithNewView(
1690+
byobBranch[kState].controller,
1691+
chunk
1692+
);
1693+
}
1694+
reading = false;
1695+
1696+
if (readAgainForBranch1) {
1697+
pull1Algorithm();
1698+
} else if (readAgainForBranch2) {
1699+
pull2Algorithm();
1700+
}
1701+
});
1702+
},
1703+
[kClose](chunk) {
1704+
reading = false;
1705+
1706+
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1707+
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1708+
1709+
if (!byobCanceled) {
1710+
readableByteStreamControllerClose(byobBranch[kState].controller);
1711+
}
1712+
if (!otherCanceled) {
1713+
readableByteStreamControllerClose(otherBranch[kState].controller);
1714+
}
1715+
if (chunk !== undefined) {
1716+
if (!byobCanceled) {
1717+
readableByteStreamControllerRespondWithNewView(
1718+
byobBranch[kState].controller,
1719+
chunk
1720+
);
1721+
}
1722+
if (
1723+
!otherCanceled &&
1724+
otherBranch[kState].controller[kState].pendingPullIntos.length > 0
1725+
) {
1726+
readableByteStreamControllerRespond(
1727+
otherBranch[kState].controller,
1728+
0
1729+
);
1730+
}
1731+
}
1732+
if (!byobCanceled || !otherCanceled) {
1733+
cancelDeferred.resolve();
1734+
}
1735+
},
1736+
[kError]() {
1737+
reading = false;
1738+
},
1739+
};
1740+
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
1741+
}
1742+
1743+
function pull1Algorithm() {
1744+
if (reading) {
1745+
readAgainForBranch1 = true;
1746+
return PromiseResolve();
1747+
}
1748+
reading = true;
1749+
1750+
const byobRequest = branch1[kState].controller.byobRequest;
1751+
if (byobRequest === null) {
1752+
pullWithDefaultReader();
1753+
} else {
1754+
pullWithBYOBReader(byobRequest[kState].view, false);
1755+
}
1756+
return PromiseResolve();
1757+
}
1758+
1759+
function pull2Algorithm() {
1760+
if (reading) {
1761+
readAgainForBranch2 = true;
1762+
return PromiseResolve();
1763+
}
1764+
reading = true;
1765+
1766+
const byobRequest = branch2[kState].controller.byobRequest;
1767+
if (byobRequest === null) {
1768+
pullWithDefaultReader();
1769+
} else {
1770+
pullWithBYOBReader(byobRequest[kState].view, true);
1771+
}
1772+
return PromiseResolve();
1773+
}
1774+
1775+
function cancel1Algorithm(reason) {
1776+
canceled1 = true;
1777+
reason1 = reason;
1778+
if (canceled2) {
1779+
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1780+
}
1781+
return cancelDeferred.promise;
1782+
}
1783+
1784+
function cancel2Algorithm(reason) {
1785+
canceled2 = true;
1786+
reason2 = reason;
1787+
if (canceled1) {
1788+
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1789+
}
1790+
return cancelDeferred.promise;
1791+
}
1792+
1793+
branch1 = new ReadableStream({
1794+
type: 'bytes',
1795+
pull: pull1Algorithm,
1796+
cancel: cancel1Algorithm,
1797+
});
1798+
branch2 = new ReadableStream({
1799+
type: 'bytes',
1800+
pull: pull2Algorithm,
1801+
cancel: cancel2Algorithm,
1802+
});
1803+
1804+
forwardReaderError(reader);
1805+
1806+
return [branch1, branch2];
1807+
}
1808+
15271809
function readableByteStreamControllerConvertPullIntoDescriptor(desc) {
15281810
const {
15291811
buffer,
@@ -2317,18 +2599,18 @@ function readableByteStreamControllerFillHeadPullIntoDescriptor(
23172599
desc.bytesFilled += size;
23182600
}
23192601

2320-
function readableByteStreamControllerEnqueue(
2321-
controller,
2322-
buffer,
2323-
byteLength,
2324-
byteOffset) {
2602+
function readableByteStreamControllerEnqueue(controller, chunk) {
23252603
const {
23262604
closeRequested,
23272605
pendingPullIntos,
23282606
queue,
23292607
stream,
23302608
} = controller[kState];
23312609

2610+
const buffer = ArrayBufferViewGetBuffer(chunk);
2611+
const byteOffset = ArrayBufferViewGetByteOffset(chunk);
2612+
const byteLength = ArrayBufferViewGetByteLength(chunk);
2613+
23322614
if (closeRequested || stream[kState].state !== 'readable')
23332615
return;
23342616

‎lib/internal/webstreams/util.js

+11
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
const {
44
ArrayBufferPrototype,
5+
ArrayBufferPrototypeSlice,
56
ArrayPrototypePush,
67
ArrayPrototypeShift,
78
AsyncIteratorPrototype,
@@ -112,6 +113,15 @@ function ArrayBufferGetByteLength(view) {
112113
return ReflectGet(ArrayBufferPrototype, 'byteLength', view);
113114
}
114115

116+
function cloneAsUint8Array(view) {
117+
const buffer = ArrayBufferViewGetBuffer(view);
118+
const byteOffset = ArrayBufferViewGetByteOffset(view);
119+
const byteLength = ArrayBufferViewGetByteLength(view);
120+
return new Uint8Array(
121+
ArrayBufferPrototypeSlice(buffer, byteOffset, byteOffset + byteLength)
122+
);
123+
}
124+
115125
function isBrandCheck(brand) {
116126
return (value) => {
117127
return value != null &&
@@ -236,6 +246,7 @@ module.exports = {
236246
ArrayBufferViewGetByteOffset,
237247
ArrayBufferGetByteLength,
238248
AsyncIterator,
249+
cloneAsUint8Array,
239250
copyArrayBuffer,
240251
customInspect,
241252
dequeueValue,

‎test/parallel/test-whatwg-readablestream.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1561,7 +1561,7 @@ class Source {
15611561
assert(!readableStreamDefaultControllerCanCloseOrEnqueue(controller));
15621562
readableStreamDefaultControllerEnqueue(controller);
15631563
readableByteStreamControllerClose(controller);
1564-
readableByteStreamControllerEnqueue(controller);
1564+
readableByteStreamControllerEnqueue(controller, new Uint8Array(1));
15651565
}
15661566

15671567
{

‎test/parallel/test-whatwg-readablestream.mjs

+34
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,37 @@ import assert from 'assert';
3434
assert.strictEqual(dataReader2, 'foobar');
3535
})().then(mustCall());
3636
}
37+
38+
{
39+
// Test ReadableByteStream.tee() with close in the nextTick after enqueue
40+
async function read(stream) {
41+
const chunks = [];
42+
for await (const chunk of stream)
43+
chunks.push(chunk);
44+
return Buffer.concat(chunks).toString();
45+
}
46+
47+
const [r1, r2] = new ReadableStream({
48+
type: 'bytes',
49+
start(controller) {
50+
process.nextTick(() => {
51+
controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114]));
52+
53+
process.nextTick(() => {
54+
controller.close();
55+
});
56+
});
57+
}
58+
}).tee();
59+
60+
(async () => {
61+
const [dataReader1, dataReader2] = await Promise.all([
62+
read(r1),
63+
read(r2),
64+
]);
65+
66+
assert.strictEqual(dataReader1, dataReader2);
67+
assert.strictEqual(dataReader1, 'foobar');
68+
assert.strictEqual(dataReader2, 'foobar');
69+
})().then(mustCall());
70+
}

‎test/wpt/status/streams.json

-41
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,4 @@
11
{
2-
"piping/abort.any.js": {
3-
"fail": {
4-
"expected": [
5-
"pipeTo on a teed readable byte stream should only be aborted when both branches are aborted"
6-
]
7-
}
8-
},
92
"queuing-strategies-size-function-per-global.window.js": {
103
"skip": "Browser-specific test"
114
},
@@ -38,40 +31,6 @@
3831
]
3932
}
4033
},
41-
"readable-byte-streams/tee.any.js": {
42-
"fail": {
43-
"expected": [
44-
"ReadableStream teeing with byte source: should be able to read one branch to the end without affecting the other",
45-
"ReadableStream teeing with byte source: chunks should be cloned for each branch",
46-
"ReadableStream teeing with byte source: chunks for BYOB requests from branch 1 should be cloned to branch 2",
47-
"ReadableStream teeing with byte source: errors in the source should propagate to both branches",
48-
"ReadableStream teeing with byte source: closing the original should close the branches",
49-
"ReadableStream teeing with byte source: erroring the original should immediately error the branches",
50-
"ReadableStream teeing with byte source: erroring the original should error pending reads from BYOB reader",
51-
"ReadableStream teeing with byte source: canceling branch1 should finish when branch2 reads until end of stream",
52-
"ReadableStream teeing with byte source: canceling branch1 should finish when original stream errors",
53-
"ReadableStream teeing with byte source: should not pull any chunks if no branches are reading",
54-
"ReadableStream teeing with byte source: should only pull enough to fill the emptiest queue",
55-
"ReadableStream teeing with byte source: should not pull when original is already errored",
56-
"ReadableStream teeing with byte source: stops pulling when original stream errors while branch 1 is reading",
57-
"ReadableStream teeing with byte source: stops pulling when original stream errors while branch 2 is reading",
58-
"ReadableStream teeing with byte source: stops pulling when original stream errors while both branches are reading",
59-
"ReadableStream teeing with byte source: canceling both branches in sequence with delay",
60-
"ReadableStream teeing with byte source: failing to cancel when canceling both branches in sequence with delay",
61-
"ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch1, cancel branch2",
62-
"ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch2, cancel branch1",
63-
"ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch2, enqueue to branch1",
64-
"ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch1, respond to branch2",
65-
"ReadableStream teeing with byte source: pull with BYOB reader, then pull with default reader",
66-
"ReadableStream teeing with byte source: pull with default reader, then pull with BYOB reader",
67-
"ReadableStream teeing with byte source: read from branch2, then read from branch1",
68-
"ReadableStream teeing with byte source: read from branch1 with default reader, then close while branch2 has pending BYOB read",
69-
"ReadableStream teeing with byte source: read from branch2 with default reader, then close while branch1 has pending BYOB read",
70-
"ReadableStream teeing with byte source: close when both branches have pending BYOB reads",
71-
"ReadableStream teeing with byte source: respond() and close() while both branches are pulling"
72-
]
73-
}
74-
},
7534
"readable-streams/cross-realm-crash.window.js": {
7635
"skip": "Browser-specific test"
7736
},

0 commit comments

Comments
 (0)
Please sign in to comment.