Skip to content

Commit

Permalink
feat: allow for capture of stdout/stderr from worker (#425)
Browse files Browse the repository at this point in the history
* feat: allow for capture of stdout/stderr from worker

Signed-off-by: Chapman Pendery <cpendery@vt.edu>

* fix: pr feedback

Signed-off-by: Chapman Pendery <cpendery@vt.edu>

* fix: small typo

Signed-off-by: Chapman Pendery <cpendery@vt.edu>

---------

Signed-off-by: Chapman Pendery <cpendery@vt.edu>
  • Loading branch information
cpendery committed Jan 18, 2024
1 parent 4dc9fcb commit 78fe8cc
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 8 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ The following options are available:
- `script: string`: the `script` option of this pool
Optionally, this callback can return an object containing one or more of the above properties. The provided properties will be used to override the Pool properties for the worker being created.
- `onTerminateWorker: Function`. A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated.
- `emitStdStreams: boolean`. For `process` or `thread` worker type. If `true`, the worker will emit `stdout` and `stderr` events instead of passing it through to the parent streams. Default value is `false`.

> Important note on `'workerType'`: when sending and receiving primitive data types (plain JSON) from and to a worker, the different worker types (`'web'`, `'process'`, `'thread'`) can be used interchangeably. However, when using more advanced data types like buffers, the API and returned results can vary. In these cases, it is best not to use the `'auto'` setting but have a fixed `'workerType'` and good unit testing in place.
Expand Down
20 changes: 20 additions & 0 deletions examples/consoleCapture.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
var workerpool = require('..');

// create a worker pool using an the consoleWorker. This worker contains
// console.log & console.error functions.
var pool = workerpool.pool(__dirname + '/workers/consoleWorker.js', {emitStdStreams: true});


pool.exec('stdStreams', [], {
on: function (payload) {
if (payload.stdout) {
console.log(`captured stdout: ${payload.stdout.trim()}`) // outputs 'captured stdout: stdout message'
}
if (payload.stderr) {
console.log(`captured stderr: ${payload.stderr.trim()}`) // outputs 'captured stderr: stderr message'
}
}})
.then(function () {
pool.terminate(); // terminate all workers when done
});

15 changes: 15 additions & 0 deletions examples/workers/consoleWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// a simple worker
var workerpool = require('../..');

function stdStreams() {
console.log("stdout message")
console.error("stderr message")
return new Promise(function (resolve, reject) {
resolve('done');
});
}

// create a worker and register some functions
workerpool.worker({
stdStreams: stdStreams,
});
4 changes: 4 additions & 0 deletions src/Pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ function Pool(script, options) {
/** @readonly */
this.onTerminateWorker = options.onTerminateWorker || (() => null);

/** @readonly */
this.emitStdStreams = options.emitStdStreams || false

// configuration
if (options && 'maxWorkers' in options) {
validateMaxWorkers(options.maxWorkers);
Expand Down Expand Up @@ -426,6 +429,7 @@ Pool.prototype._createWorkerHandler = function () {
debugPort: DEBUG_PORT_ALLOCATOR.nextAvailableStartingAt(this.debugPortStart),
workerType: this.workerType,
workerTerminateTimeout: this.workerTerminateTimeout,
emitStdStreams: this.emitStdStreams,
});
}

Expand Down
46 changes: 38 additions & 8 deletions src/WorkerHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ function setupWorker(script, options) {
return setupBrowserWorker(script, options.workerOpts, Worker);
} else if (options.workerType === 'thread') { // node.js only
WorkerThreads = ensureWorkerThreads();
return setupWorkerThreadWorker(script, WorkerThreads, options.workerThreadOpts);
return setupWorkerThreadWorker(script, WorkerThreads, options);
} else if (options.workerType === 'process' || !options.workerType) { // node.js only
return setupProcessWorker(script, resolveForkOptions(options), require('child_process'));
} else { // options.workerType === 'auto' or undefined
Expand All @@ -78,7 +78,7 @@ function setupWorker(script, options) {
else { // environment.platform === 'node'
var WorkerThreads = tryRequireWorkerThreads();
if (WorkerThreads) {
return setupWorkerThreadWorker(script, WorkerThreads, options.workerThreadOpts);
return setupWorkerThreadWorker(script, WorkerThreads, options);
} else {
return setupProcessWorker(script, resolveForkOptions(options), require('child_process'));
}
Expand Down Expand Up @@ -106,14 +106,14 @@ function setupBrowserWorker(script, workerOpts, Worker) {
return worker;
}

function setupWorkerThreadWorker(script, WorkerThreads, workerThreadOptions) {
function setupWorkerThreadWorker(script, WorkerThreads, options) {
// validate the options right before creating the worker thread (not when creating the pool)
validateOptions(workerThreadOptions, workerThreadOptsNames, 'workerThreadOpts')
validateOptions(options?.workerThreadOpts, workerThreadOptsNames, 'workerThreadOpts')

var worker = new WorkerThreads.Worker(script, {
stdout: false, // automatically pipe worker.STDOUT to process.STDOUT
stderr: false, // automatically pipe worker.STDERR to process.STDERR
...workerThreadOptions
stdout: options?.emitStdStreams ?? false, // pipe worker.STDOUT to process.STDOUT if not requested
stderr: options?.emitStdStreams ?? false, // pipe worker.STDERR to process.STDERR if not requested
...options?.workerThreadOpts
});
worker.isWorkerThread = true;
worker.send = function(message, transfer) {
Expand All @@ -129,6 +129,11 @@ function setupWorkerThreadWorker(script, WorkerThreads, workerThreadOptions) {
this.terminate();
};

if (options?.emitStdStreams) {
worker.stdout.on('data', (data) => worker.emit("stdout", data))
worker.stderr.on('data', (data) => worker.emit("stderr", data))
}

return worker;
}

Expand All @@ -149,6 +154,11 @@ function setupProcessWorker(script, options, child_process) {
return send.call(worker, message);
};

if (options.emitStdStreams) {
worker.stdout.on('data', (data) => worker.emit("stdout", data))
worker.stderr.on('data', (data) => worker.emit("stderr", data))
}

worker.isChildProcess = true;
return worker;
}
Expand Down Expand Up @@ -180,7 +190,8 @@ function resolveForkOptions(opts) {
forkArgs: opts.forkArgs,
forkOpts: Object.assign({}, opts.forkOpts, {
execArgv: (opts.forkOpts && opts.forkOpts.execArgv || [])
.concat(execArgv)
.concat(execArgv),
stdio: opts.emitStdStreams ? "pipe": undefined
})
});
}
Expand All @@ -201,6 +212,17 @@ function objectToError (obj) {
return temp
}

function handleEmittedStdPayload(handler, payload) {
// TODO: refactor if parallel task execution gets added
if (Object.keys(handler.processing).length !== 1) {
return;
}
var task = Object.values(handler.processing)[0]
if (task.options && typeof task.options.on === 'function') {
task.options.on(payload);
}
}

/**
* A WorkerHandler controls a single worker. This worker can be a child process
* on node.js or a WebWorker in a browser environment.
Expand Down Expand Up @@ -229,6 +251,14 @@ function WorkerHandler(script, _options) {

// queue for requests that are received before the worker is ready
this.requestQueue = [];

this.worker.on("stdout", function (data) {
handleEmittedStdPayload(me, {"stdout": data.toString()})
})
this.worker.on("stderr", function (data) {
handleEmittedStdPayload(me, {"stderr": data.toString()})
})

this.worker.on('message', function (response) {
if (me.terminated) {
return;
Expand Down
1 change: 1 addition & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* @property {import('child_process').ForkOptions} [forkOpts] For `process` worker type. An object passed as `options` to [child_process.fork](https://nodejs.org/api/child_process.html#child_processforkmodulepath-args-options).
* @property {WorkerOptions} [workerOpts] For `web` worker type. An object passed to the [constructor of the web worker](https://html.spec.whatwg.org/multipage/workers.html#dom-worker). See [WorkerOptions specification](https://html.spec.whatwg.org/multipage/workers.html#workeroptions) for available options.
* @property {import('worker_threads').WorkerOptions} [workerThreadOpts] Object`. For `worker` worker type. An object passed to [worker_threads.options](https://nodejs.org/api/worker_threads.html#new-workerfilename-options).
* @property {boolean} [emitStdStreams] Emit stdout and stdout from worker under the `stdout` and `stderr` events. Not supported by `web` worker type.
* @property { (arg: WorkerArg) => WorkerArg | undefined } [onCreateWorker] A callback that is called whenever a worker is being created. It can be used to allocate resources for each worker for example. Optionally, this callback can return an object containing one or more of the `WorkerArg` properties. The provided properties will be used to override the Pool properties for the worker being created.
* @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated.
*/
Expand Down
100 changes: 100 additions & 0 deletions test/Pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,106 @@ describe('Pool', function () {
assert(terminatedWorkers.includes('env_value2'), 'terminatedWorkers should include the value with counter = 2');
});
});

it('supports stdout/stderr capture via fork', function(done) {
var pool = createPool(__dirname + '/workers/console.js', {workerType: 'process', emitStdStreams: true});

var receivedEvents = []
pool.exec("stdStreams", [], {
on: function (payload) {
receivedEvents.push(payload)
}
})
.then(function (result) {
assert.strictEqual(result, 'done');
assert.deepStrictEqual(receivedEvents, [{
stdout: 'stdout message\n'
}, {
stderr: 'stderr message\n'
}]);

pool.terminate();
done();
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
done(err);
});
})

it('excludes stdout/stderr capture via fork', function(done) {
var pool = createPool(__dirname + '/workers/console.js', {workerType: 'process'});

var receivedEvents = []
pool.exec("stdStreams", [], {
on: function (payload) {
receivedEvents.push(payload)
}
})
.then(function (result) {
assert.strictEqual(result, 'done');
assert.deepStrictEqual(receivedEvents, []);

pool.terminate();
done();
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
done(err);
});
})

it('supports stdout/stderr capture via threads', function(done) {
var pool = createPool(__dirname + '/workers/console.js', {workerType: 'threads', emitStdStreams: true});

var receivedEvents = []
pool.exec("stdStreams", [], {
on: function (payload) {
receivedEvents.push(payload)
}
})
.then(function (result) {
assert.strictEqual(result, 'done');
assert.deepStrictEqual(receivedEvents, [{
stdout: 'stdout message\n'
}, {
stderr: 'stderr message\n'
}]);

pool.terminate();
done();
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
done(err);
});
})

it('excludes stdout/stderr capture via threads', function(done) {
var pool = createPool(__dirname + '/workers/console.js', {workerType: 'threads'});

var receivedEvents = []
pool.exec("stdStreams", [], {
on: function (payload) {
receivedEvents.push(payload)
}
})
.then(function (result) {
assert.strictEqual(result, 'done');
assert.deepStrictEqual(receivedEvents, []);

pool.terminate();
done();
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
done(err);
});
})

it('should offload a function to a worker', function (done) {
var pool = createPool({maxWorkers: 10});
Expand Down
15 changes: 15 additions & 0 deletions test/workers/console.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// a simple worker
var workerpool = require('../../');

function stdStreams() {
console.log("stdout message")
console.error("stderr message")
return new Promise(function (resolve, reject) {
resolve('done');
});
}

// create a worker and register some functions
workerpool.worker({
stdStreams: stdStreams,
});

0 comments on commit 78fe8cc

Please sign in to comment.