Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow for capture of stdout/stderr from worker #425

Merged
merged 3 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ The following options are available:
- `script: string`: the `script` option of this pool
Optionally, this callback can return an object containing one or more of the above properties. The provided properties will be used to override the Pool properties for the worker being created.
- `onTerminateWorker: Function`. A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated.
- `emitStdStreams: boolean`. For `process` or `thread` worker type. If `true`, the worker will emit `stdout` and `stderr` events instead of passing it through to the parent streams. Default value is `false`.

> Important note on `'workerType'`: when sending and receiving primitive data types (plain JSON) from and to a worker, the different worker types (`'web'`, `'process'`, `'thread'`) can be used interchangeably. However, when using more advanced data types like buffers, the API and returned results can vary. In these cases, it is best not to use the `'auto'` setting but have a fixed `'workerType'` and good unit testing in place.

Expand Down
20 changes: 20 additions & 0 deletions examples/consoleCapture.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
var workerpool = require('..');

// create a worker pool using an the consoleWorker. This worker contains
// console.log & console.error functions.
var pool = workerpool.pool(__dirname + '/workers/consoleWorker.js', {emitStdStreams: true});


pool.exec('stdStreams', [], {
on: function (payload) {
if (payload.stdout) {
console.log(`captured stdout: ${payload.stdout.trim()}`) // outputs 'captured stdout: stdout message'
}
if (payload.stderr) {
console.log(`captured stderr: ${payload.stderr.trim()}`) // outputs 'captured stderr: stderr message'
}
}})
.then(function () {
pool.terminate(); // terminate all workers when done
});

15 changes: 15 additions & 0 deletions examples/workers/consoleWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// a simple worker
var workerpool = require('../..');

function stdStreams() {
console.log("stdout message")
console.error("stderr message")
return new Promise(function (resolve, reject) {
resolve('done');
});
}

// create a worker and register some functions
workerpool.worker({
stdStreams: stdStreams,
});
4 changes: 4 additions & 0 deletions src/Pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ function Pool(script, options) {
/** @readonly */
this.onTerminateWorker = options.onTerminateWorker || (() => null);

/** @readonly */
this.emitStdStreams = options.emitStdStreams || false

// configuration
if (options && 'maxWorkers' in options) {
validateMaxWorkers(options.maxWorkers);
Expand Down Expand Up @@ -426,6 +429,7 @@ Pool.prototype._createWorkerHandler = function () {
debugPort: DEBUG_PORT_ALLOCATOR.nextAvailableStartingAt(this.debugPortStart),
workerType: this.workerType,
workerTerminateTimeout: this.workerTerminateTimeout,
emitStdStreams: this.emitStdStreams,
});
}

Expand Down
46 changes: 38 additions & 8 deletions src/WorkerHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ function setupWorker(script, options) {
return setupBrowserWorker(script, options.workerOpts, Worker);
} else if (options.workerType === 'thread') { // node.js only
WorkerThreads = ensureWorkerThreads();
return setupWorkerThreadWorker(script, WorkerThreads, options.workerThreadOpts);
return setupWorkerThreadWorker(script, WorkerThreads, options);
} else if (options.workerType === 'process' || !options.workerType) { // node.js only
return setupProcessWorker(script, resolveForkOptions(options), require('child_process'));
} else { // options.workerType === 'auto' or undefined
Expand All @@ -78,7 +78,7 @@ function setupWorker(script, options) {
else { // environment.platform === 'node'
var WorkerThreads = tryRequireWorkerThreads();
if (WorkerThreads) {
return setupWorkerThreadWorker(script, WorkerThreads, options.workerThreadOpts);
return setupWorkerThreadWorker(script, WorkerThreads, options);
} else {
return setupProcessWorker(script, resolveForkOptions(options), require('child_process'));
}
Expand Down Expand Up @@ -106,14 +106,14 @@ function setupBrowserWorker(script, workerOpts, Worker) {
return worker;
}

function setupWorkerThreadWorker(script, WorkerThreads, workerThreadOptions) {
function setupWorkerThreadWorker(script, WorkerThreads, options) {
// validate the options right before creating the worker thread (not when creating the pool)
validateOptions(workerThreadOptions, workerThreadOptsNames, 'workerThreadOpts')
validateOptions(options?.workerThreadOpts, workerThreadOptsNames, 'workerThreadOpts')

var worker = new WorkerThreads.Worker(script, {
stdout: false, // automatically pipe worker.STDOUT to process.STDOUT
stderr: false, // automatically pipe worker.STDERR to process.STDERR
...workerThreadOptions
stdout: options?.emitStdStreams ?? false, // pipe worker.STDOUT to process.STDOUT if not requested
stderr: options?.emitStdStreams ?? false, // pipe worker.STDERR to process.STDERR if not requested
...options?.workerThreadOpts
});
worker.isWorkerThread = true;
worker.send = function(message, transfer) {
Expand All @@ -129,6 +129,11 @@ function setupWorkerThreadWorker(script, WorkerThreads, workerThreadOptions) {
this.terminate();
};

if (options?.emitStdStreams) {
worker.stdout.on('data', (data) => worker.emit("stdout", data))
worker.stderr.on('data', (data) => worker.emit("stderr", data))
}

return worker;
}

Expand All @@ -149,6 +154,11 @@ function setupProcessWorker(script, options, child_process) {
return send.call(worker, message);
};

if (options.emitStdStreams) {
worker.stdout.on('data', (data) => worker.emit("stdout", data))
worker.stderr.on('data', (data) => worker.emit("stderr", data))
}

worker.isChildProcess = true;
return worker;
}
Expand Down Expand Up @@ -180,7 +190,8 @@ function resolveForkOptions(opts) {
forkArgs: opts.forkArgs,
forkOpts: Object.assign({}, opts.forkOpts, {
execArgv: (opts.forkOpts && opts.forkOpts.execArgv || [])
.concat(execArgv)
.concat(execArgv),
stdio: opts.emitStdStreams ? "pipe": undefined
})
});
}
Expand All @@ -201,6 +212,17 @@ function objectToError (obj) {
return temp
}

function handleEmittedStdPayload(handler, payload) {
// TODO: refactor if parallel task execution gets added
if (Object.keys(handler.processing).length !== 1) {
return;
}
var task = Object.values(handler.processing)[0]
if (task.options && typeof task.options.on === 'function') {
task.options.on(payload);
}
}

/**
* A WorkerHandler controls a single worker. This worker can be a child process
* on node.js or a WebWorker in a browser environment.
Expand Down Expand Up @@ -229,6 +251,14 @@ function WorkerHandler(script, _options) {

// queue for requests that are received before the worker is ready
this.requestQueue = [];

this.worker.on("stdout", function (data) {
handleEmittedStdPayload(me, {"stdout": data.toString()})
})
this.worker.on("stderr", function (data) {
handleEmittedStdPayload(me, {"stderr": data.toString()})
})

this.worker.on('message', function (response) {
if (me.terminated) {
return;
Expand Down
1 change: 1 addition & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* @property {import('child_process').ForkOptions} [forkOpts] For `process` worker type. An object passed as `options` to [child_process.fork](https://nodejs.org/api/child_process.html#child_processforkmodulepath-args-options).
* @property {WorkerOptions} [workerOpts] For `web` worker type. An object passed to the [constructor of the web worker](https://html.spec.whatwg.org/multipage/workers.html#dom-worker). See [WorkerOptions specification](https://html.spec.whatwg.org/multipage/workers.html#workeroptions) for available options.
* @property {import('worker_threads').WorkerOptions} [workerThreadOpts] Object`. For `worker` worker type. An object passed to [worker_threads.options](https://nodejs.org/api/worker_threads.html#new-workerfilename-options).
* @property {boolean} [emitStdStreams] Emit stdout and stdout from worker under the `stdout` and `stderr` events. Not supported by `web` worker type.
* @property { (arg: WorkerArg) => WorkerArg | undefined } [onCreateWorker] A callback that is called whenever a worker is being created. It can be used to allocate resources for each worker for example. Optionally, this callback can return an object containing one or more of the `WorkerArg` properties. The provided properties will be used to override the Pool properties for the worker being created.
* @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated.
*/
Expand Down
100 changes: 100 additions & 0 deletions test/Pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,106 @@ describe('Pool', function () {
assert(terminatedWorkers.includes('env_value2'), 'terminatedWorkers should include the value with counter = 2');
});
});

it('supports stdout/stderr capture via fork', function(done) {
var pool = createPool(__dirname + '/workers/console.js', {workerType: 'process', emitStdStreams: true});

var receivedEvents = []
pool.exec("stdStreams", [], {
on: function (payload) {
receivedEvents.push(payload)
}
})
.then(function (result) {
assert.strictEqual(result, 'done');
assert.deepStrictEqual(receivedEvents, [{
stdout: 'stdout message\n'
}, {
stderr: 'stderr message\n'
}]);

pool.terminate();
done();
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
done(err);
});
})

it('excludes stdout/stderr capture via fork', function(done) {
var pool = createPool(__dirname + '/workers/console.js', {workerType: 'process'});

var receivedEvents = []
pool.exec("stdStreams", [], {
on: function (payload) {
receivedEvents.push(payload)
}
})
.then(function (result) {
assert.strictEqual(result, 'done');
assert.deepStrictEqual(receivedEvents, []);

pool.terminate();
done();
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
done(err);
});
})

it('supports stdout/stderr capture via threads', function(done) {
var pool = createPool(__dirname + '/workers/console.js', {workerType: 'threads', emitStdStreams: true});

var receivedEvents = []
pool.exec("stdStreams", [], {
on: function (payload) {
receivedEvents.push(payload)
}
})
.then(function (result) {
assert.strictEqual(result, 'done');
assert.deepStrictEqual(receivedEvents, [{
stdout: 'stdout message\n'
}, {
stderr: 'stderr message\n'
}]);

pool.terminate();
done();
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
done(err);
});
})

it('excludes stdout/stderr capture via threads', function(done) {
var pool = createPool(__dirname + '/workers/console.js', {workerType: 'threads'});

var receivedEvents = []
pool.exec("stdStreams", [], {
on: function (payload) {
receivedEvents.push(payload)
}
})
.then(function (result) {
assert.strictEqual(result, 'done');
assert.deepStrictEqual(receivedEvents, []);

pool.terminate();
done();
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
done(err);
});
})

it('should offload a function to a worker', function (done) {
var pool = createPool({maxWorkers: 10});
Expand Down
15 changes: 15 additions & 0 deletions test/workers/console.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// a simple worker
var workerpool = require('../../');

function stdStreams() {
console.log("stdout message")
console.error("stderr message")
return new Promise(function (resolve, reject) {
resolve('done');
});
}

// create a worker and register some functions
workerpool.worker({
stdStreams: stdStreams,
});