Skip to content

Commit

Permalink
stream: add pipeline() for webstreams
Browse files Browse the repository at this point in the history
Refs: #39316
PR-URL: #46307
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
debadree25 authored and MylesBorins committed Feb 18, 2023
1 parent 4cf4b41 commit c206853
Showing 5 changed files with 500 additions and 10 deletions.
12 changes: 8 additions & 4 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
@@ -2696,6 +2696,9 @@ const cleanup = finished(rs, (err) => {
<!-- YAML
added: v10.0.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/46307
description: Added support for webstreams.
- version: v18.0.0
pr-url: https://github.com/nodejs/node/pull/41678
description: Passing an invalid callback to the `callback` argument
@@ -2712,13 +2715,14 @@ changes:
description: Add support for async generators.
-->

* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
* `source` {Stream|Iterable|AsyncIterable|Function}
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
ReadableStream\[]|WritableStream\[]|TransformStream\[]}
* `source` {Stream|Iterable|AsyncIterable|Function|ReadableStream}
* Returns: {Iterable|AsyncIterable}
* `...transforms` {Stream|Function}
* `...transforms` {Stream|Function|TransformStream}
* `source` {AsyncIterable}
* Returns: {AsyncIterable}
* `destination` {Stream|Function}
* `destination` {Stream|Function|WritableStream}
* `source` {AsyncIterable}
* Returns: {AsyncIterable|Promise}
* `callback` {Function} Called when the pipeline is fully done.
68 changes: 63 additions & 5 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
@@ -35,6 +35,9 @@ const {
isReadable,
isReadableNodeStream,
isNodeStream,
isTransformStream,
isWebStream,
isReadableStream,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');

@@ -88,7 +91,7 @@ async function* fromReadable(val) {
yield* Readable.prototype[SymbolAsyncIterator].call(val);
}

async function pump(iterable, writable, finish, { end }) {
async function pumpToNode(iterable, writable, finish, { end }) {
let error;
let onresolve = null;

@@ -147,6 +150,35 @@ async function pump(iterable, writable, finish, { end }) {
}
}

async function pumpToWeb(readable, writable, finish, { end }) {
if (isTransformStream(writable)) {
writable = writable.writable;
}
// https://streams.spec.whatwg.org/#example-manual-write-with-backpressure
const writer = writable.getWriter();
try {
for await (const chunk of readable) {
await writer.ready;
writer.write(chunk).catch(() => {});
}

await writer.ready;

if (end) {
await writer.close();
}

finish();
} catch (err) {
try {
await writer.abort(err);
finish(err);
} catch (err) {
finish(err);
}
}
}

function pipeline(...streams) {
return pipelineImpl(streams, once(popCallback(streams)));
}
@@ -259,7 +291,11 @@ function pipelineImpl(streams, callback, opts) {
ret = Duplex.from(stream);
}
} else if (typeof stream === 'function') {
ret = makeAsyncIterable(ret);
if (isTransformStream(ret)) {
ret = makeAsyncIterable(ret?.readable);
} else {
ret = makeAsyncIterable(ret);
}
ret = stream(ret, { signal });

if (reading) {
@@ -303,7 +339,11 @@ function pipelineImpl(streams, callback, opts) {
);
} else if (isIterable(ret, true)) {
finishCount++;
pump(ret, pt, finish, { end });
pumpToNode(ret, pt, finish, { end });
} else if (isReadableStream(ret) || isTransformStream(ret)) {
const toRead = ret.readable || ret;
finishCount++;
pumpToNode(toRead, pt, finish, { end });
} else {
throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable or Promise', 'destination', ret);
@@ -324,12 +364,30 @@ function pipelineImpl(streams, callback, opts) {
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
} else if (isTransformStream(ret) || isReadableStream(ret)) {
const toRead = ret.readable || ret;
finishCount++;
pumpToNode(toRead, stream, finish, { end });
} else if (isIterable(ret)) {
finishCount++;
pump(ret, stream, finish, { end });
pumpToNode(ret, stream, finish, { end });
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
}
ret = stream;
} else if (isWebStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount++;
pumpToWeb(makeAsyncIterable(ret), stream, finish, { end });
} else if (isReadableStream(ret) || isIterable(ret)) {
finishCount++;
pumpToWeb(ret, stream, finish, { end });
} else if (isTransformStream(ret)) {
pumpToWeb(ret.readable, stream, finish, { end });
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable'], ret);
'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
}
ret = stream;
} else {
15 changes: 15 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
@@ -77,6 +77,19 @@ function isWritableStream(obj) {
);
}

function isTransformStream(obj) {
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.readable === 'object' &&
typeof obj.writable === 'object'
);
}

function isWebStream(obj) {
return isReadableStream(obj) || isWritableStream(obj) || isTransformStream(obj);
}

function isIterable(obj, isAsync) {
if (obj == null) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
@@ -303,6 +316,7 @@ module.exports = {
isReadableFinished,
isReadableErrored,
isNodeStream,
isWebStream,
isWritable,
isWritableNodeStream,
isWritableStream,
@@ -312,4 +326,5 @@ module.exports = {
isServerRequest,
isServerResponse,
willEmitClose,
isTransformStream,
};
3 changes: 2 additions & 1 deletion lib/stream/promises.js
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ const {
const {
isIterable,
isNodeStream,
isWebStream,
} = require('internal/streams/utils');

const { pipelineImpl: pl } = require('internal/streams/pipeline');
@@ -21,7 +22,7 @@ function pipeline(...streams) {
let end;
const lastArg = streams[streams.length - 1];
if (lastArg && typeof lastArg === 'object' &&
!isNodeStream(lastArg) && !isIterable(lastArg)) {
!isNodeStream(lastArg) && !isIterable(lastArg) && !isWebStream(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
end = options.end;
412 changes: 412 additions & 0 deletions test/parallel/test-webstreams-pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,412 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Readable, Writable, Transform, pipeline } = require('stream');
const { pipeline: pipelinePromise } = require('stream/promises');
const { ReadableStream, WritableStream, TransformStream } = require('stream/web');
const http = require('http');

{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const ws = new WritableStream({
write(chunk) {
values.push(chunk);
}
});

pipeline(rs, ws, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['hello', 'world']);
}));

c.enqueue('hello');
c.enqueue('world');
c.close();
}

{
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});

const ws = new WritableStream({
write() { }
});

pipeline(rs, ws, common.mustCall((err) => {
assert.strictEqual(err?.message, 'kaboom');
}));

c.error(new Error('kaboom'));
}

{
let c;
const values = [];
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});

const ts = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk?.toString().toUpperCase());
}
});

const ws = new WritableStream({
write(chunk) {
values.push(chunk?.toString());
}
});

pipeline(rs, ts, ws, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
}));

c.enqueue('hello');
c.enqueue('world');
c.close();
}

{
function makeTransformStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk?.toString());
}
});
}

let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});

const ws = new WritableStream({
write() { }
});

pipeline(rs,
makeTransformStream(),
makeTransformStream(),
makeTransformStream(),
makeTransformStream(),
ws,
common.mustCall((err) => {
assert.strictEqual(err?.message, 'kaboom');
}));

c.error(new Error('kaboom'));
}

{
const values = [];

const r = new Readable({
read() { }
});

const ws = new WritableStream({
write(chunk) {
values.push(chunk?.toString());
}
});

pipeline(r, ws, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['helloworld']);
}));

r.push('hello');
r.push('world');
r.push(null);
}

{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});

const w = new Writable({
write(chunk, encoding, callback) {
values.push(chunk?.toString());
callback();
}
});

pipeline(rs, w, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['hello', 'world']);
}));

c.enqueue('hello');
c.enqueue('world');
c.close();
}

{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});

const ws = new WritableStream({
write(chunk) {
values.push(chunk?.toString());
}
});

const t = new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk?.toString().toUpperCase());
}
});

pipeline(rs, t, ws, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['HELLOWORLD']);
}));

c.enqueue('hello');
c.enqueue('world');
c.close();
}

{
const server = http.createServer((req, res) => {
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.enqueue('world');
controller.close();
}
});
pipeline(rs, res, common.mustSucceed(() => {}));
});

server.listen(0, common.mustCall(() => {
const req = http.request({
port: server.address().port
});
req.end();
const values = [];
req.on('response', (res) => {
res.on('data', (chunk) => {
values.push(chunk?.toString());
});
res.on('end', common.mustCall(() => {
assert.deepStrictEqual(values, ['hello', 'world']);
server.close();
}));
});
}));
}

{
const values = [];
const server = http.createServer((req, res) => {
const ts = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk?.toString().toUpperCase());
}
});
pipeline(req, ts, res, common.mustSucceed());
});

server.listen(0, () => {
const req = http.request({
port: server.address().port,
method: 'POST',
});


const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.close();
}
});

pipeline(rs, req, common.mustSucceed());

req.on('response', (res) => {
res.on('data', (chunk) => {
values.push(chunk?.toString());
}
);
res.on('end', common.mustCall(() => {
assert.deepStrictEqual(values, ['HELLO']);
server.close();
}));
});
});
}

{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const ws = new WritableStream({
write(chunk) {
values.push(chunk?.toString());
}
});

pipelinePromise(rs, ws).then(common.mustCall(() => {
assert.deepStrictEqual(values, ['hello', 'world']);
}));

c.enqueue('hello');
c.enqueue('world');
c.close();
}

{
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});

const ws = new WritableStream({
write() { }
});

pipelinePromise(rs, ws).then(common.mustNotCall()).catch(common.mustCall((err) => {
assert.strictEqual(err?.message, 'kaboom');
}));

c.error(new Error('kaboom'));
}

{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});

pipeline(rs, async function(source) {
for await (const chunk of source) {
values.push(chunk?.toString());
}
}, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['hello', 'world']);
}));

c.enqueue('hello');
c.enqueue('world');
c.close();
}

{
const rs = new ReadableStream({
start() {}
});

pipeline(rs, async function(source) {
throw new Error('kaboom');
}, (err) => {
assert.strictEqual(err?.message, 'kaboom');
});
}

{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});

const ts = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk?.toString().toUpperCase());
}
});

pipeline(rs, ts, async function(source) {
for await (const chunk of source) {
values.push(chunk?.toString());
}
}, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
}));

c.enqueue('hello');
c.enqueue('world');
c.close();
}

{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});

const ws = new WritableStream({
write(chunk) {
values.push(chunk?.toString());
}
});

pipeline(rs, async function* (source) {
for await (const chunk of source) {
yield chunk?.toString().toUpperCase();
}
}, ws, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
}));

c.enqueue('hello');
c.enqueue('world');
c.close();
}

{
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});

const ws = new WritableStream({
write(chunk) { }
}, { highWaterMark: 0 });

pipeline(rs, ws, common.mustNotCall());

for (let i = 0; i < 10; i++) {
c.enqueue(`${i}`);
}
c.close();
}

0 comments on commit c206853

Please sign in to comment.