Skip to content

Commit

Permalink
feat: pool.recycleWorkers() method
Browse files Browse the repository at this point in the history
  • Loading branch information
AriPerkkio committed Jun 30, 2023
1 parent 1b6ca55 commit 89df201
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ We have a similar API to Piscina, so for more information, you can read Piscina'
#### Pool methods
- `cancelPendingTasks()`: Gracefully cancels all pending tasks without stopping or interfering with on-going tasks. This method is useful when your tasks may have side effects and should not be terminated forcefully during task execution. If your tasks don't have any side effects you may want to use [`{ signal }`](https://github.com/piscinajs/piscina#cancelable-tasks) option for forcefully terminating all tasks, including the on-going ones, instead.
- `recycleWorkers()`: Waits for all current tasks to finish and re-creates all workers. Can be used to force isolation imperatively even when `isolateWorkers` is disabled.
#### Exports
Expand Down
42 changes: 39 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
lastSeenResponseCount: number = 0
usedMemory?: number
onMessage: ResponseCallback
shouldRecycle?: boolean

constructor(
worker: Worker,
Expand Down Expand Up @@ -995,17 +996,27 @@ class ThreadPool {
}

shouldRecycleWorker(taskInfo?: TaskInfo): boolean {
// Worker could be set to recycle by pool's imperative methods
if (taskInfo?.workerInfo?.shouldRecycle) {
return true
}

// When `isolateWorkers` is enabled, remove the worker after task is finished
const isWorkerIsolated = this.options.isolateWorkers && taskInfo?.workerInfo
if (this.options.isolateWorkers && taskInfo?.workerInfo) {
return true
}

// When `maxMemoryLimitBeforeRecycle` is enabled, remove workers that have exceeded the memory limit
const isWorkersMemoryLimitReached =
if (
!this.options.isolateWorkers &&
this.options.maxMemoryLimitBeforeRecycle !== undefined &&
(taskInfo?.workerInfo?.usedMemory || 0) >
this.options.maxMemoryLimitBeforeRecycle
) {
return true
}

return Boolean(isWorkerIsolated || isWorkersMemoryLimitReached)
return false
}

pendingCapacity(): number {
Expand Down Expand Up @@ -1041,6 +1052,27 @@ class ThreadPool {

await Promise.all(exitEvents)
}

async recycleWorkers() {
const exitEvents: Promise<any[]>[] = []

Array.from(this.workers).filter((workerInfo) => {
exitEvents.push(once(workerInfo!.worker, 'exit'))

// Remove idle workers
if (workerInfo.currentUsage() === 0) {
this._removeWorker(workerInfo!)
}
// Mark on-going workers for recycling
else {
workerInfo.shouldRecycle = true
}
})

await Promise.all(exitEvents)

this._ensureMinimumWorkers()
}
}

class Tinypool extends EventEmitterAsyncResource {
Expand Down Expand Up @@ -1116,6 +1148,10 @@ class Tinypool extends EventEmitterAsyncResource {
pool.taskQueue.cancel()
}

async recycleWorkers() {
await this.#pool.recycleWorkers()
}

get completed(): number {
return this.#pool.completed
}
Expand Down
68 changes: 68 additions & 0 deletions test/isolation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { dirname, resolve } from 'path'
import { Tinypool } from 'tinypool'
import { fileURLToPath } from 'url'

const __dirname = dirname(fileURLToPath(import.meta.url))

test('idle workers can be recycled', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/sleep.js'),
minThreads: 4,
maxThreads: 4,
isolateWorkers: false,
})

function getThreadIds() {
return pool.threads.map((thread) => thread!.threadId).sort()
}

expect(pool.threads).toHaveLength(4)
const initialThreadIds = getThreadIds()

await Promise.all(times(4)(() => pool.run({})))
expect(getThreadIds()).toStrictEqual(initialThreadIds)

await pool.recycleWorkers()
expect(pool.threads).toHaveLength(4)

const newThreadIds = getThreadIds()
initialThreadIds.forEach((id) => expect(newThreadIds).not.toContain(id))

await Promise.all(times(4)(() => pool.run({})))
initialThreadIds.forEach((id) => expect(newThreadIds).not.toContain(id))
})

test('running workers can recycle after task execution finishes', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/sleep.js'),
minThreads: 4,
maxThreads: 4,
isolateWorkers: false,
})

function getThreadIds() {
return pool.threads.map((thread) => thread!.threadId).sort()
}

expect(pool.threads).toHaveLength(4)
const initialThreadIds = getThreadIds()
expect(getThreadIds()).toStrictEqual(initialThreadIds)

const tasks = [
...times(2)(() => pool.run({ time: 500 })),
...times(2)(() => pool.run({ time: 2000 })),
]

await Promise.all(tasks.slice(0, 2))
await pool.recycleWorkers()
await Promise.all(tasks)

const newThreadIds = getThreadIds()
initialThreadIds.forEach((id) => expect(newThreadIds).not.toContain(id))
})

function times(count: number) {
return function run<T>(fn: () => T): T[] {
return Array(count).fill(0).map(fn)
}
}

0 comments on commit 89df201

Please sign in to comment.