Skip to content

Commit a51d085

Browse files
authoredMar 14, 2024··
Allow multiple readers at once (#121)
1 parent 9749f68 commit a51d085

File tree

5 files changed

+391
-6
lines changed

5 files changed

+391
-6
lines changed
 

‎package.json

+4
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,13 @@
4242
"object",
4343
"concat"
4444
],
45+
"dependencies": {
46+
"is-stream": "^4.0.1"
47+
},
4548
"devDependencies": {
4649
"@types/node": "^20.8.9",
4750
"ava": "^5.3.1",
51+
"onetime": "^7.0.0",
4852
"precise-now": "^3.0.0",
4953
"stream-json": "^1.8.0",
5054
"tsd": "^0.29.0",

‎source/contents.js

+4-6
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1+
import {getAsyncIterable} from './stream.js';
2+
13
export const getStreamContents = async (stream, {init, convertChunk, getSize, truncateChunk, addChunk, getFinalChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => {
2-
if (!isAsyncIterable(stream)) {
3-
throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.');
4-
}
4+
const asyncIterable = getAsyncIterable(stream);
55

66
const state = init();
77
state.length = 0;
88

99
try {
10-
for await (const chunk of stream) {
10+
for await (const chunk of asyncIterable) {
1111
const chunkType = getChunkType(chunk);
1212
const convertedChunk = convertChunk[chunkType](chunk, state);
1313
appendChunk({convertedChunk, state, getSize, truncateChunk, addChunk, maxBuffer});
@@ -52,8 +52,6 @@ const addNewChunk = (convertedChunk, state, addChunk, newLength) => {
5252
state.length = newLength;
5353
};
5454

55-
const isAsyncIterable = stream => typeof stream === 'object' && stream !== null && typeof stream[Symbol.asyncIterator] === 'function';
56-
5755
const getChunkType = chunk => {
5856
const typeOfChunk = typeof chunk;
5957

‎source/stream.js

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import {isReadableStream} from 'is-stream';
2+
3+
export const getAsyncIterable = stream => {
4+
if (isReadableStream(stream, {checkOpen: false})) {
5+
return getStreamIterable(stream);
6+
}
7+
8+
if (typeof stream?.[Symbol.asyncIterator] !== 'function') {
9+
throw new TypeError('The first argument must be a Readable, a ReadableStream, or an async iterable.');
10+
}
11+
12+
return stream;
13+
};
14+
15+
// The default iterable for Node.js streams does not allow for multiple readers at once, so we re-implement it
16+
const getStreamIterable = async function * (stream) {
17+
if (nodeImports === undefined) {
18+
await loadNodeImports();
19+
}
20+
21+
const controller = new AbortController();
22+
const state = {};
23+
handleStreamEnd(stream, controller, state);
24+
25+
try {
26+
for await (const [chunk] of nodeImports.events.on(stream, 'data', {
27+
signal: controller.signal,
28+
highWatermark: stream.readableHighWaterMark,
29+
})) {
30+
yield chunk;
31+
}
32+
} catch (error) {
33+
// Stream failure, for example due to `stream.destroy(error)`
34+
if (state.error !== undefined) {
35+
throw state.error;
36+
// `error` event directly emitted on stream
37+
} else if (!controller.signal.aborted) {
38+
throw error;
39+
// Otherwise, stream completed successfully
40+
}
41+
// The `finally` block also runs when the caller throws, for example due to the `maxBuffer` option
42+
} finally {
43+
stream.destroy();
44+
}
45+
};
46+
47+
const handleStreamEnd = async (stream, controller, state) => {
48+
try {
49+
await nodeImports.streamPromises.finished(stream, {cleanup: true, readable: true, writable: false, error: false});
50+
} catch (error) {
51+
state.error = error;
52+
} finally {
53+
controller.abort();
54+
}
55+
};
56+
57+
// Use dynamic imports to support browsers
58+
const loadNodeImports = async () => {
59+
const [events, streamPromises] = await Promise.all([
60+
import('node:events'),
61+
import('node:stream/promises'),
62+
]);
63+
nodeImports = {events, streamPromises};
64+
};
65+
66+
let nodeImports;

‎test/fixtures/index.js

+2
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,5 @@ export const fixtureMultibyteString = '\u1000';
3131
export const longMultibyteString = `${fixtureMultibyteString}\u1000`;
3232

3333
export const bigArray = Array.from({length: 1e5}, () => Math.floor(Math.random() * (2 ** 8)));
34+
35+
export const prematureClose = {code: 'ERR_STREAM_PREMATURE_CLOSE'};

‎test/stream.js

+315
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
import {once} from 'node:events';
2+
import {version} from 'node:process';
3+
import {Readable, Duplex} from 'node:stream';
4+
import {finished} from 'node:stream/promises';
5+
import {scheduler, setTimeout as pSetTimeout} from 'node:timers/promises';
6+
import test from 'ava';
7+
import onetime from 'onetime';
8+
import getStream, {getStreamAsArray, MaxBufferError} from '../source/index.js';
9+
import {fixtureString, fixtureMultiString, prematureClose} from './fixtures/index.js';
10+
11+
const onFinishedStream = stream => finished(stream, {cleanup: true});
12+
const noopMethods = {read() {}, write() {}};
13+
14+
// eslint-disable-next-line max-params
15+
const assertStream = ({readableEnded = false, writableEnded = false}, t, stream, StreamClass, error = null) => {
16+
t.is(stream.errored, error);
17+
t.true(stream.destroyed);
18+
t.false(stream.readable);
19+
t.is(stream.readableEnded, readableEnded);
20+
21+
if (StreamClass === Duplex) {
22+
t.false(stream.writable);
23+
t.is(stream.writableEnded, writableEnded);
24+
}
25+
};
26+
27+
const assertSuccess = assertStream.bind(undefined, {readableEnded: true, writableEnded: true});
28+
const assertReadFail = assertStream.bind(undefined, {writableEnded: true});
29+
const assertWriteFail = assertStream.bind(undefined, {readableEnded: true});
30+
const assertBothFail = assertStream.bind(undefined, {});
31+
32+
const testSuccess = async (t, StreamClass) => {
33+
const stream = StreamClass.from(fixtureMultiString);
34+
t.true(stream instanceof StreamClass);
35+
36+
t.deepEqual(await getStreamAsArray(stream), fixtureMultiString);
37+
assertSuccess(t, stream, StreamClass);
38+
};
39+
40+
test('Can use Readable stream', testSuccess, Readable);
41+
test('Can use Duplex stream', testSuccess, Duplex);
42+
43+
const testAlreadyEnded = async (t, StreamClass) => {
44+
const stream = StreamClass.from(fixtureMultiString);
45+
await stream.toArray();
46+
assertSuccess(t, stream, StreamClass);
47+
48+
t.deepEqual(await getStreamAsArray(stream), []);
49+
};
50+
51+
test('Can use already ended Readable', testAlreadyEnded, Readable);
52+
test('Can use already ended Duplex', testAlreadyEnded, Duplex);
53+
54+
const testAlreadyAborted = async (t, StreamClass) => {
55+
const stream = StreamClass.from(fixtureMultiString);
56+
stream.destroy();
57+
await t.throwsAsync(onFinishedStream(stream), prematureClose);
58+
assertReadFail(t, stream, StreamClass);
59+
60+
const error = await t.throwsAsync(getStreamAsArray(stream), prematureClose);
61+
t.deepEqual(error.bufferedData, []);
62+
};
63+
64+
test('Throw if already aborted Readable', testAlreadyAborted, Readable);
65+
test('Throw if already aborted Duplex', testAlreadyAborted, Duplex);
66+
67+
const testAlreadyErrored = async (t, StreamClass) => {
68+
const stream = StreamClass.from(fixtureMultiString);
69+
const error = new Error('test');
70+
stream.destroy(error);
71+
t.is(await t.throwsAsync(onFinishedStream(stream)), error);
72+
assertReadFail(t, stream, StreamClass, error);
73+
74+
t.is(await t.throwsAsync(getStreamAsArray(stream)), error);
75+
t.deepEqual(error.bufferedData, []);
76+
};
77+
78+
test('Throw if already errored Readable', testAlreadyErrored, Readable);
79+
test('Throw if already errored Duplex', testAlreadyErrored, Duplex);
80+
81+
const testAbort = async (t, StreamClass) => {
82+
const stream = new StreamClass(noopMethods);
83+
setTimeout(() => {
84+
stream.destroy();
85+
}, 0);
86+
const error = await t.throwsAsync(getStreamAsArray(stream), prematureClose);
87+
t.deepEqual(error.bufferedData, []);
88+
assertBothFail(t, stream, StreamClass);
89+
};
90+
91+
test('Throw when aborting Readable', testAbort, Readable);
92+
test('Throw when aborting Duplex', testAbort, Duplex);
93+
94+
const testError = async (t, StreamClass) => {
95+
const stream = new StreamClass(noopMethods);
96+
const error = new Error('test');
97+
setTimeout(() => {
98+
stream.destroy(error);
99+
}, 0);
100+
t.is(await t.throwsAsync(getStreamAsArray(stream)), error);
101+
t.deepEqual(error.bufferedData, []);
102+
assertBothFail(t, stream, StreamClass, error);
103+
};
104+
105+
test('Throw when erroring Readable', testError, Readable);
106+
test('Throw when erroring Duplex', testError, Duplex);
107+
108+
const testErrorEvent = async (t, StreamClass, hasCause) => {
109+
const stream = new StreamClass(noopMethods);
110+
const error = new Error('test', hasCause ? {cause: new Error('inner')} : {});
111+
setTimeout(() => {
112+
stream.emit('error', error);
113+
}, 0);
114+
t.is(await t.throwsAsync(getStreamAsArray(stream)), error);
115+
t.deepEqual(error.bufferedData, []);
116+
assertBothFail(t, stream, StreamClass);
117+
};
118+
119+
test('Throw when emitting "error" event with Readable', testErrorEvent, Readable, false);
120+
test('Throw when emitting "error" event with Duplex', testErrorEvent, Duplex, false);
121+
test('Throw when emitting "error" event with Readable and error.cause', testErrorEvent, Readable, true);
122+
test('Throw when emitting "error" event with Duplex and error.cause', testErrorEvent, Duplex, true);
123+
124+
const testThrowRead = async (t, StreamClass) => {
125+
const error = new Error('test');
126+
const stream = new StreamClass({
127+
read() {
128+
throw error;
129+
},
130+
});
131+
t.is(await t.throwsAsync(getStreamAsArray(stream)), error);
132+
t.deepEqual(error.bufferedData, []);
133+
assertBothFail(t, stream, StreamClass, error);
134+
};
135+
136+
test('Throw when throwing error in Readable read()', testThrowRead, Readable);
137+
test('Throw when throwing error in Duplex read()', testThrowRead, Duplex);
138+
139+
test('Throw when throwing error in Readable destroy()', async t => {
140+
const error = new Error('test');
141+
const stream = new Readable({
142+
read: onetime(function () {
143+
this.push(fixtureString);
144+
this.push(null);
145+
}),
146+
destroy(_, done) {
147+
done(error);
148+
},
149+
});
150+
151+
t.is(await t.throwsAsync(getStream(stream)), error);
152+
t.deepEqual(error.bufferedData, fixtureString);
153+
assertSuccess(t, stream, Readable, error);
154+
});
155+
156+
test('Throw when throwing error in Duplex final()', async t => {
157+
const error = new Error('test');
158+
const stream = new Duplex({
159+
read: onetime(function () {
160+
this.push(null);
161+
}),
162+
final(done) {
163+
done(error);
164+
},
165+
});
166+
stream.end();
167+
168+
t.is(await t.throwsAsync(getStream(stream)), error);
169+
t.is(await t.throwsAsync(onFinishedStream(stream)), error);
170+
assertReadFail(t, stream, Duplex, error);
171+
});
172+
173+
test('Does not wait for Duplex writable side', async t => {
174+
const error = new Error('test');
175+
const stream = new Duplex({
176+
read: onetime(function () {
177+
this.push(null);
178+
}),
179+
destroy(_, done) {
180+
done(error);
181+
},
182+
});
183+
184+
t.is(await getStream(stream), '');
185+
t.is(await t.throwsAsync(onFinishedStream(stream)), error);
186+
assertWriteFail(t, stream, Duplex, error);
187+
});
188+
189+
test('Handle non-error instances', async t => {
190+
const stream = Readable.from(fixtureMultiString);
191+
const errorMessage = `< ${fixtureString} >`;
192+
stream.destroy(errorMessage);
193+
const [{reason}] = await Promise.allSettled([onFinishedStream(stream)]);
194+
t.is(reason, errorMessage);
195+
assertReadFail(t, stream, Readable, errorMessage);
196+
197+
await t.throwsAsync(getStreamAsArray(stream), {message: errorMessage});
198+
});
199+
200+
test('Handles objectMode errors', async t => {
201+
const stream = new Readable({
202+
read: onetime(function () {
203+
this.push(fixtureString);
204+
this.push({});
205+
}),
206+
objectMode: true,
207+
});
208+
209+
const error = await t.throwsAsync(getStream(stream), {message: /in object mode/});
210+
t.is(error.bufferedData, fixtureString);
211+
assertReadFail(t, stream, Readable);
212+
});
213+
214+
test('Handles maxBuffer errors', async t => {
215+
const stream = new Readable({
216+
read: onetime(function () {
217+
this.push(fixtureString);
218+
this.push(fixtureString);
219+
}),
220+
});
221+
222+
const error = await t.throwsAsync(
223+
getStream(stream, {maxBuffer: fixtureString.length}),
224+
{instanceOf: MaxBufferError},
225+
);
226+
t.is(error.bufferedData, fixtureString);
227+
assertReadFail(t, stream, Readable);
228+
});
229+
230+
test('Works if Duplex readable side ends before its writable side', async t => {
231+
const stream = new Duplex(noopMethods);
232+
stream.push(null);
233+
234+
t.deepEqual(await getStreamAsArray(stream), []);
235+
assertWriteFail(t, stream, Duplex);
236+
});
237+
238+
test('Cleans up event listeners', async t => {
239+
const stream = Readable.from([]);
240+
t.is(stream.listenerCount('error'), 0);
241+
242+
t.deepEqual(await getStreamAsArray(stream), []);
243+
244+
t.is(stream.listenerCount('error'), 0);
245+
});
246+
247+
const testMultipleReads = async (t, wait) => {
248+
const size = 10;
249+
const stream = new Readable({
250+
read: onetime(async function () {
251+
for (let index = 0; index < size; index += 1) {
252+
for (let index = 0; index < size; index += 1) {
253+
this.push(fixtureString);
254+
}
255+
256+
// eslint-disable-next-line no-await-in-loop
257+
await wait();
258+
}
259+
260+
this.push(null);
261+
}),
262+
});
263+
264+
t.is(await getStream(stream), fixtureString.repeat(size * size));
265+
assertSuccess(t, stream, Readable);
266+
};
267+
268+
test('Handles multiple successive fast reads', testMultipleReads, () => scheduler.yield());
269+
test('Handles multiple successive slow reads', testMultipleReads, () => pSetTimeout(100));
270+
271+
// The `highWaterMark` option was added to `once()` by Node 20.
272+
// See https://github.com/nodejs/node/pull/41276
273+
const nodeMajor = version.split('.')[0].slice(1);
274+
if (nodeMajor >= 20) {
275+
test('Pause stream when too much data at once', async t => {
276+
const stream = new Readable({
277+
read: onetime(function () {
278+
this.push('.');
279+
this.push('.');
280+
this.push('.');
281+
this.push('.');
282+
this.push(null);
283+
}),
284+
highWaterMark: 2,
285+
});
286+
const [result] = await Promise.all([
287+
getStream(stream),
288+
once(stream, 'pause'),
289+
]);
290+
t.is(result, '....');
291+
assertSuccess(t, stream, Readable);
292+
});
293+
}
294+
295+
test('Can call twice at the same time', async t => {
296+
const stream = Readable.from(fixtureMultiString);
297+
const [result, secondResult] = await Promise.all([
298+
getStream(stream),
299+
getStream(stream),
300+
]);
301+
t.deepEqual(result, fixtureString);
302+
t.deepEqual(secondResult, fixtureString);
303+
assertSuccess(t, stream, Readable);
304+
});
305+
306+
test('Can call and listen to "data" event at the same time', async t => {
307+
const stream = Readable.from([fixtureString]);
308+
const [result, secondResult] = await Promise.all([
309+
getStream(stream),
310+
once(stream, 'data'),
311+
]);
312+
t.deepEqual(result, fixtureString);
313+
t.deepEqual(secondResult.toString(), fixtureString);
314+
assertSuccess(t, stream, Readable);
315+
});

0 commit comments

Comments
 (0)
Please sign in to comment.