diff --git a/.github/workflows/nodejs.yml b/.github/workflows/nodejs.yml
index 3ab56c1c..62e744eb 100644
--- a/.github/workflows/nodejs.yml
+++ b/.github/workflows/nodejs.yml
@@ -38,7 +38,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
- node-version: [18.x, 20.x, 22.x]
+ node-version: [20.x, 22.x]
runs-on: ${{matrix.os}}
steps:
- uses: actions/checkout@v4
diff --git a/.gitignore b/.gitignore
index 71b423d4..67e00598 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,4 @@
node_modules
dist
coverage
+tsconfig.tsbuildinfo
\ No newline at end of file
diff --git a/docs/docs/api-reference/class.md b/docs/docs/api-reference/class.md
index 2e75a003..2f0f15df 100644
--- a/docs/docs/api-reference/class.md
+++ b/docs/docs/api-reference/class.md
@@ -55,55 +55,54 @@ This class extends [`EventEmitter`](https://nodejs.org/api/events.html) from Nod
returning control to the main thread (avoid having open handles within a thread). If still want to have the possibility
of having open handles or handle asynchrnous tasks, you can set the environment variable `PISCINA_ENABLE_ASYNC_ATOMICS` to `1` or setting `options.atomics` to `async`.
-** :::info
+ :::info
**Note**: The `async` mode comes with performance penalties and can lead to undesired behaviour if open handles are not tracked correctly.
Workers should be designed to wait for all operations to finish before returning control to the main thread, if any background operations are still running
`async` can be of help (e.g. for cache warming, etc).
:::
-**
- - `resourceLimits`: (`object`) See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options)
- - `maxOldGenerationSizeMb`: (`number`) The maximum size of each worker threads
- main heap in MB.
- - `maxYoungGenerationSizeMb`: (`number`) The maximum size of a heap space for
- recently created objects.
- - `codeRangeSizeMb`: (`number`) The size of a pre-allocated memory range used
- for generated code.
- - `stackSizeMb` : (`number`) The default maximum stack size for the thread.
- Small values may lead to unusable Worker instances. Default: 4
- - `env`: (`object`) If set, specifies the initial value of `process.env` inside
- the worker threads. See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for details.
- - `argv`: (`any[]`) List of arguments that will be stringified and appended to
- `process.argv` in the worker. See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for details.
- - `execArgv`: (`string[]`) List of Node.js CLI options passed to the worker.
- See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for details.
- - `workerData`: (`any`) Any JavaScript value that can be cloned and made
- available as `require('piscina').workerData`. See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options)
- for details. Unlike regular Node.js Worker Threads, `workerData` must not
- specify any value requiring a `transferList`. This is because the `workerData`
- will be cloned for each pooled worker.
- - `taskQueue`: (`TaskQueue`) By default, Piscina uses a first-in-first-out
- queue for submitted tasks. The `taskQueue` option can be used to provide an
- alternative implementation. See [Custom Task Queues](https://github.com/piscinajs/piscina#custom_task_queues) for additional detail.
- - `niceIncrement`: (`number`) An optional value that decreases priority for
- the individual threads, i.e. the higher the value, the lower the priority
- of the Worker threads. This value is used on Unix/Windows and requires the
- optional [`@napi-rs/nice`](https://npmjs.org/package/@napi-rs/nice) module to be installed.
- See [`nice(2)`](https://linux.die.net/man/2/nice) and [`SetThreadPriority`](https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority) for more details.
- - `trackUnmanagedFds`: (`boolean`) An optional setting that, when `true`, will
- cause Workers to track file descriptors managed using `fs.open()` and
- `fs.close()`, and will close them automatically when the Worker exits.
- Defaults to `true`. (This option is only supported on Node.js 12.19+ and
- all Node.js versions higher than 14.6.0).
- - `closeTimeout`: (`number`) An optional time (in milliseconds) to wait for the pool to
- complete all in-flight tasks when `close()` is called. The default is `30000`
- - `recordTiming`: (`boolean`) By default, run and wait time will be recorded
- for the pool. To disable, set to `false`.
- - `workerHistogram`: (`boolean`) By default `false`. It will hint the Worker pool to record statistics for each individual Worker
- - `loadBalancer`: ([`PiscinaLoadBalancer`](#piscinaloadbalancer)) By default, Piscina uses a least-busy algorithm. The `loadBalancer`
- option can be used to provide an alternative implementation. See [Custom Load Balancers](../advanced-topics/loadbalancer.mdx) for additional detail.
- - `workerHistogram`: (`boolean`) By default `false`. It will hint the Worker pool to record statistics for each individual Worker
- - `loadBalancer`: ([`PiscinaLoadBalancer`](#piscinaloadbalancer)) By default, Piscina uses a least-busy algorithm. The `loadBalancer`
- option can be used to provide an alternative implementation. See [Custom Load Balancers](../advanced-topics/loadbalancer.mdx) for additional detail.
+- `resourceLimits`: (`object`) See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options)
+ - `maxOldGenerationSizeMb`: (`number`) The maximum size of each worker threads
+ main heap in MB.
+ - `maxYoungGenerationSizeMb`: (`number`) The maximum size of a heap space for
+ recently created objects.
+ - `codeRangeSizeMb`: (`number`) The size of a pre-allocated memory range used
+ for generated code.
+ - `stackSizeMb` : (`number`) The default maximum stack size for the thread.
+ Small values may lead to unusable Worker instances. Default: 4
+- `env`: (`object`) If set, specifies the initial value of `process.env` inside
+ the worker threads. See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for details.
+- `argv`: (`any[]`) List of arguments that will be stringified and appended to
+ `process.argv` in the worker. See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for details.
+- `execArgv`: (`string[]`) List of Node.js CLI options passed to the worker.
+ See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for details.
+- `workerData`: (`any`) Any JavaScript value that can be cloned and made
+ available as `require('piscina').workerData`. See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options)
+ for details. Unlike regular Node.js Worker Threads, `workerData` must not
+ specify any value requiring a `transferList`. This is because the `workerData`
+ will be cloned for each pooled worker.
+- `taskQueue`: (`TaskQueue`) By default, Piscina uses a first-in-first-out
+ queue for submitted tasks. The `taskQueue` option can be used to provide an
+ alternative implementation. See [Custom Task Queues](https://github.com/piscinajs/piscina#custom_task_queues) for additional detail.
+- `niceIncrement`: (`number`) An optional value that decreases priority for
+ the individual threads, i.e. the higher the value, the lower the priority
+ of the Worker threads. This value is used on Unix/Windows and requires the
+ optional [`@napi-rs/nice`](https://npmjs.org/package/@napi-rs/nice) module to be installed.
+ See [`nice(2)`](https://linux.die.net/man/2/nice) and [`SetThreadPriority`](https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority) for more details.
+- `trackUnmanagedFds`: (`boolean`) An optional setting that, when `true`, will
+ cause Workers to track file descriptors managed using `fs.open()` and
+ `fs.close()`, and will close them automatically when the Worker exits.
+ Defaults to `true`. (This option is only supported on Node.js 12.19+ and
+ all Node.js versions higher than 14.6.0).
+- `closeTimeout`: (`number`) An optional time (in milliseconds) to wait for the pool to
+ complete all in-flight tasks when `close()` is called. The default is `30000`
+- `recordTiming`: (`boolean`) By default, run and wait time will be recorded
+ for the pool. To disable, set to `false`.
+- `workerHistogram`: (`boolean`) By default `false`. It will hint the Worker pool to record statistics for each individual Worker
+- `loadBalancer`: ([`PiscinaLoadBalancer`](#piscinaloadbalancer)) By default, Piscina uses a least-busy algorithm. The `loadBalancer`
+ option can be used to provide an alternative implementation. See [Custom Load Balancers](../advanced-topics/loadbalancer.mdx) for additional detail.
+- `workerHistogram`: (`boolean`) By default `false`. It will hint the Worker pool to record statistics for each individual Worker
+- `loadBalancer`: ([`PiscinaLoadBalancer`](#piscinaloadbalancer)) By default, Piscina uses a least-busy algorithm. The `loadBalancer`
+ option can be used to provide an alternative implementation. See [Custom Load Balancers](../advanced-topics/loadbalancer.mdx) for additional detail.
:::caution
Use caution when setting resource limits. Setting limits that are too low may
@@ -191,7 +190,7 @@ type PiscinaHistogramSummary = {
p99_9: number;
p99_99: number;
p99_999: number;
-}
+};
```
## `PiscinaLoadBalancer`
@@ -239,6 +238,7 @@ interface PiscinaWorker {
### Example: Custom Load Balancer
#### JavaScript
+
```js
@@ -279,6 +279,7 @@ piscina
```
#### TypeScript
+
```ts
diff --git a/docs/docs/api-reference/method.md b/docs/docs/api-reference/method.md
index 943eb7d0..937e9850 100644
--- a/docs/docs/api-reference/method.md
+++ b/docs/docs/api-reference/method.md
@@ -2,21 +2,22 @@
id: Methods
sidebar_position: 3
---
-## Method: `run(task[, options])`
+
+## Method: `run(task[, options]): T | Redeable`
Schedules a task to be run on a Worker thread.
-* `task`: Any value. This will be passed to the function that is exported from
+- `task`: Any value. This will be passed to the function that is exported from
`filename`.
-* `options`:
- * `transferList`: An optional lists of objects that is passed to
+- `options`:
+ - `transferList`: An optional lists of objects that is passed to
[`postMessage()`] when posting `task` to the Worker, which are transferred
rather than cloned.
- * `filename`: Optionally overrides the `filename` option passed to the
+ - `filename`: Optionally overrides the `filename` option passed to the
constructor for this task. If no `filename` was specified to the constructor,
this is mandatory.
- * `name`: Optionally overrides the exported worker function used for the task.
- * `abortSignal`: An `AbortSignal` instance. If passed, this can be used to
+ - `name`: Optionally overrides the exported worker function used for the task.
+ - `abortSignal`: An `AbortSignal` instance. If passed, this can be used to
cancel a task. If the task is already running, the corresponding `Worker`
thread will be stopped.
(More generally, any `EventEmitter` or `EventTarget` that emits `'abort'`
@@ -29,32 +30,42 @@ an error, the returned `Promise` will be rejected with that error.
If the task is aborted, the returned `Promise` is rejected with an error
as well.
-## Method: `runTask(task[, transferList][, filename][, abortSignal])`
+In case the worker return an `AsyncGenerator` or a `Generator`, the `run` method
+will resolve with a [`Readable`](https://nodejs.org/api/stream.html#class-streamreadable) stream that will emit the values produced by the generator.
-**Deprecated** -- Use `run(task, options)` instead.
+:::caution
+The return of a `Redeable` stream is experimental and is subject to further changes
+:::
-Schedules a task to be run on a Worker thread.
+**Example**
-* `task`: Any value. This will be passed to the function that is exported from
- `filename`.
-* `transferList`: An optional lists of objects that is passed to
- [`postMessage()`] when posting `task` to the Worker, which are transferred
- rather than cloned.
-* `filename`: Optionally overrides the `filename` option passed to the
- constructor for this task. If no `filename` was specified to the constructor,
- this is mandatory.
-* `signal`: An [`AbortSignal`][] instance. If passed, this can be used to
- cancel a task. If the task is already running, the corresponding `Worker`
- thread will be stopped.
- (More generally, any `EventEmitter` or `EventTarget` that emits `'abort'`
- events can be passed here.) Abortable tasks cannot share threads regardless
- of the `concurrentTasksPerWorker` options.
+`worker.js`
-This returns a `Promise` for the return value of the (async) function call
-made to the function exported from `filename`. If the (async) function throws
-an error, the returned `Promise` will be rejected with that error.
-If the task is aborted, the returned `Promise` is rejected with an error
-as well.
+```js
+module.exports = function* (length) {
+ for (let i = 0; i < length; i++) {
+ yield `${i}`;
+ }
+};
+```
+
+`main.js`
+
+```js
+const { resolve } = require('node:path');
+const { Piscina } = require('piscina');
+
+const pool = new Piscina({
+ filename: resolve(__dirname, 'worker.js'),
+});
+
+(async () => {
+ const stream = pool.run(10);
+ for await (const value of stream) {
+ console.log(value);
+ }
+})();
+```
## Method: `destroy()`
@@ -64,9 +75,9 @@ This returns a `Promise` that is fulfilled once all threads have stopped.
## Method: `close([options])`
-* `options`:
- * `force`: A `boolean` value that indicates whether to abort all tasks that
- are enqueued but not started yet. The default is `false`.
+- `options`:
+ - `force`: A `boolean` value that indicates whether to abort all tasks that
+ are enqueued but not started yet. The default is `false`.
It stops all Workers gracefully.
@@ -75,4 +86,4 @@ have completed and all threads have stopped.
This method is similar to `destroy()`, but with the difference that `close()`
will wait for the worker tasks to finish, while `destroy()`
-will abort them immediately.
\ No newline at end of file
+will abort them immediately.
diff --git a/package.json b/package.json
index 39364426..947e0cd7 100644
--- a/package.json
+++ b/package.json
@@ -10,10 +10,11 @@
"require": "./dist/main.js"
},
"engines": {
- "node": ">=18.x"
+ "node": ">=20.x"
},
"scripts": {
"build": "tsc && gen-esm-wrapper . dist/esm-wrapper.mjs",
+ "build:clean": "rm -rf ./dist tsconfig.tsbuildinfo",
"lint": "eslint",
"test": "c8 tap",
"test:ci": "npm run lint && npm run build && npm run test:coverage",
diff --git a/src/index.ts b/src/index.ts
index 8bab56b2..af0800c2 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -37,6 +37,7 @@ import {
PiscinaWorker,
LeastBusyBalancer
} from './worker_pool';
+import { WorkerStream } from './worker_pool/worker_stream';
import {
AbortSignalAny,
AbortSignalEventTarget,
@@ -284,22 +285,28 @@ class ThreadPool {
// remove the `TaskInfo` associated with the Worker, which marks it as
// free again.
const taskInfo = workerInfo.taskInfos.get(taskId);
- workerInfo.taskInfos.delete(taskId);
-
- // TODO: we can abstract the task info handling
- // right into the pool.workers.taskDone method
- pool.workers.taskDone(workerInfo);
/* istanbul ignore if */
- if (taskInfo === undefined) {
+ if (taskInfo == null) {
const err = new Error(
`Unexpected message from Worker: ${inspect(message)}`);
pool.publicInterface.emit('error', err);
+ workerInfo.taskInfos.delete(taskId);
} else {
- taskInfo.done(message.error, result);
- }
+ // Iterator -- yield
+ if (message.kind === 1) {
+ taskInfo.done(message.error, message.state === 0 ? null : result, message.error != null || message.state === 0)
+ } else {
+ workerInfo.taskInfos.delete(taskId);
+ taskInfo.done(message.error, result, true);
- pool._processPendingMessages();
+ // TODO: we can abstract the task info handling
+ // right into the pool.workers.taskDone method
+ pool.workers.taskDone(workerInfo);
+
+ pool._processPendingMessages();
+ }
+ }
}
function onReady () {
@@ -369,7 +376,7 @@ class ThreadPool {
// If there are remaining unfinished tasks, call the callback that was
// passed to `postTask` with the error
for (const taskInfo of taskInfos) {
- taskInfo.done(err, null);
+ taskInfo.done(err, null, true);
}
} else if (!onlyErrorUnfinishedTasks) {
// If there are no unfinished tasks, instead emit an 'error' event
@@ -514,15 +521,31 @@ class ThreadPool {
transferList,
filename,
name,
- (err : Error | null, result : any) => {
+ (err, result, done) => {
+ if (done === false) {
+ if (taskInfo.redeable == null) {
+ taskInfo.redeable = new WorkerStream();
+ resolve(taskInfo.redeable);
+ }
+
+ taskInfo.redeable.push(result)
+ return;
+ } else if (done === true && taskInfo.redeable != null) {
+ if (err == null) taskInfo.redeable.push(null);
+ else taskInfo.redeable.destroy(err)
+ }
+
this.completed++;
if (taskInfo.started) {
this.histogram?.recordRunTime(performance.now() - taskInfo.started);
}
- if (err !== null) {
- reject(err);
- } else {
- resolve(result);
+
+ if (done === true) {
+ if (err !== null) {
+ reject(err);
+ } else {
+ resolve(result);
+ }
}
this._maybeDrain();
@@ -622,11 +645,11 @@ class ThreadPool {
this.destroying = true;
while (this.skipQueue.length > 0) {
const taskInfo : TaskInfo = this.skipQueue.shift() as TaskInfo;
- taskInfo.done(new Error('Terminating worker thread'));
+ taskInfo.done(new Error('Terminating worker thread'), null, true);
}
while (this.taskQueue.size > 0) {
const taskInfo : TaskInfo = this.taskQueue.shift() as TaskInfo;
- taskInfo.done(new Error('Terminating worker thread'));
+ taskInfo.done(new Error('Terminating worker thread'), null, true);
}
const exitEvents : Promise[] = [];
@@ -651,7 +674,7 @@ class ThreadPool {
for (let i = 0; i < skipQueueLength; i++) {
const taskInfo : TaskInfo = this.skipQueue.shift() as TaskInfo;
if (taskInfo.workerInfo === null) {
- taskInfo.done(new AbortError('pool is closed'));
+ taskInfo.done(new AbortError('pool is closed'), null, true);
} else {
this.skipQueue.push(taskInfo);
}
@@ -661,7 +684,7 @@ class ThreadPool {
for (let i = 0; i < taskQueueLength; i++) {
const taskInfo : TaskInfo = this.taskQueue.shift() as TaskInfo;
if (taskInfo.workerInfo === null) {
- taskInfo.done(new AbortError('pool is closed'));
+ taskInfo.done(new AbortError('pool is closed'), null, true);
} else {
this.taskQueue.push(taskInfo);
}
diff --git a/src/task_queue/index.ts b/src/task_queue/index.ts
index 711c231b..a839d0d8 100644
--- a/src/task_queue/index.ts
+++ b/src/task_queue/index.ts
@@ -8,11 +8,12 @@ import { isMovable } from '../common';
import { kTransferable, kValue, kQueueOptions } from '../symbols';
import type { Task, TaskQueue, PiscinaTask } from './common';
+import type { WorkerStream } from '../worker_pool/worker_stream';
export { ArrayTaskQueue } from './array_queue';
export { FixedQueue } from './fixed_queue';
-export type TaskCallback = (err: Error, result: any) => void
+export type TaskCallback = (err: Error, result: any, done: boolean) => void
// Grab the type of `transferList` off `MessagePort`. At the time of writing,
// only ArrayBuffer and MessagePort are valid, but let's avoid having to update
// our types here every time Node.js adds support for more objects.
@@ -56,6 +57,7 @@ export class TaskInfo extends AsyncResource implements Task {
workerInfo : WorkerInfo | null = null;
created : number;
started : number;
+ redeable : WorkerStream | null = null;
aborted = false;
_abortListener: (() => void) | null = null;
@@ -114,8 +116,11 @@ export class TaskInfo extends AsyncResource implements Task {
return ret;
}
- done (err : Error | null, result? : any) : void {
- this.runInAsyncScope(this.callback, null, err, result);
+ done (err : Error | null, result : any, done: boolean) : void {
+ this.runInAsyncScope(this.callback, null, err, result, done);
+
+ if (done === false) return;
+
this.emitDestroy(); // `TaskInfo`s are used only once.
// If an abort signal was used, remove the listener from it when
// done to make sure we do not accidentally leak.
diff --git a/src/types.ts b/src/types.ts
index 9b55bdf5..1cadce1a 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -27,6 +27,8 @@ export interface ReadyMessage {
export interface ResponseMessage {
taskId: number
result: any
+ kind: 0 | 1; // 0 normal - 1 iterator
+ state: 0 | 1; // 0 done - 1 yield
error: Error | null
time: number | null
}
diff --git a/src/worker.ts b/src/worker.ts
index 4bbb6b1f..98b9bf14 100644
--- a/src/worker.ts
+++ b/src/worker.ts
@@ -26,6 +26,10 @@ commonState.workerData = workerData;
function noop (): void {}
const handlerCache : Map = new Map();
+const GeneratorFunctionConstructor = (function*(){}).constructor.name;
+const AsyncGeneratorConstructor = (async function*(){}).constructor.name;
+const AsyncFunctionConstructor = (async function(){}).constructor.name;
+const FunctionConstructor = (function(){}).constructor.name;
let useAtomics : boolean = process.env.PISCINA_DISABLE_ATOMICS !== '1';
let useAsyncAtomics : boolean = process.env.PISCINA_ENABLE_ASYNC_ATOMICS === '1';
@@ -123,9 +127,6 @@ function atomicsWaitLoop (port : MessagePort, sharedBuffer : Int32Array) {
// running, we wait for a signal from the parent thread using Atomics.wait(),
// and read the message from the port instead of generating an event,
// in order to avoid that overhead.
- // The one catch is that this stops asynchronous operations that are still
- // running from proceeding. Generally, tasks should not spawn asynchronous
- // operations without waiting for them to finish, though.
if (useAsyncAtomics === true) {
// @ts-expect-error - for some reason not supported by TS
@@ -177,13 +178,88 @@ async function onMessage (
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`);
}
- let result = await handler(task);
- if (isMovable(result)) {
- transferList = transferList.concat(result[kTransferable]);
- result = result[kValue];
+
+ let result: any = null;
+ switch (handler.constructor.name) {
+ case FunctionConstructor: {
+ result = handler(task);
+
+ // Handle theneable
+ if (result?.then != null) {
+ result = await result;
+ }
+
+ if (isMovable(result)) {
+ transferList = transferList.concat(result[kTransferable]);
+ result = result[kValue];
+ }
+
+ break
+ }
+ case AsyncFunctionConstructor: {
+ result = await handler(task)
+ if (isMovable(result)) {
+ transferList = transferList.concat(result[kTransferable]);
+ result = result[kValue];
+ }
+ break;
+ }
+ case AsyncGeneratorConstructor: {
+ // We only support string or buffer
+ for await (const chunk of handler(task)) {
+ // @ts-expect-error
+ process._rawDebug(`async iterator - chunk type ${typeof chunk}`);
+ if (typeof chunk !== 'string'
+ && Buffer.isBuffer(chunk) === false
+ && ArrayBuffer.isView(chunk) === false) {
+ throw new TypeError('AsyncIterators should only return string, buffer or typed arrays')
+ }
+
+ // TODO: change the shape of the response
+ // Ideally we can hint the main thread that the
+ // response will be streamed
+ const res = {
+ taskId,
+ kind: 1,
+ state: 1,
+ result: chunk,
+ error: null,
+ };
+
+ queueMicrotask(() => { port.postMessage(res); })
+ }
+ break;
+ }
+ case GeneratorFunctionConstructor: {
+ // We only support string or buffer
+ for (const chunk of handler(task)) {
+ if (typeof chunk !== 'string'
+ && Buffer.isBuffer(chunk) === false
+ && ArrayBuffer.isView(chunk) === false) {
+ throw new TypeError('AsyncIterators should only return string, buffer or typed arrays')
+ }
+
+ const res = {
+ taskId,
+ kind: 1,
+ state: 1,
+ result: chunk,
+ error: null,
+ };
+
+ port.postMessage(res);
+ }
+ break;
+ }
+ default: {
+ throw new Error(`Unsupported handler exported from ${filename}`);
+ }
}
+
response = {
taskId,
+ kind: 0,
+ state: 0,
result,
error: null,
time: start == null ? null : Math.round(performance.now() - start)
@@ -205,8 +281,10 @@ async function onMessage (
} catch (error) {
response = {
taskId,
+ kind: 0,
+ state: 0,
result: null,
- // It may be worth taking a look at the error cloning algorithm we
+ // TODO: It may be worth taking a look at the error cloning algorithm we
// use in Node.js core here, it's quite a bit more flexible
error: error,
time: start == null ? null : Math.round(performance.now() - start)
diff --git a/src/worker_pool/index.ts b/src/worker_pool/index.ts
index 199b1811..309b644c 100644
--- a/src/worker_pool/index.ts
+++ b/src/worker_pool/index.ts
@@ -66,7 +66,7 @@ export class WorkerInfo extends AsynchronouslyCreatedResource {
this.port.close();
this.clearIdleTimeout();
for (const taskInfo of this.taskInfos.values()) {
- taskInfo.done(Errors.ThreadTermination());
+ taskInfo.done(Errors.ThreadTermination(), null, true);
}
this.taskInfos.clear();
@@ -125,7 +125,7 @@ export class WorkerInfo extends AsynchronouslyCreatedResource {
} catch (err) {
// This would mostly happen if e.g. message contains unserializable data
// or transferList is invalid.
- taskInfo.done(err);
+ taskInfo.done(err, null, true);
return;
}
diff --git a/src/worker_pool/worker_stream.ts b/src/worker_pool/worker_stream.ts
new file mode 100644
index 00000000..ff126b14
--- /dev/null
+++ b/src/worker_pool/worker_stream.ts
@@ -0,0 +1,9 @@
+import { Readable } from 'node:stream';
+
+class WorkerStream extends Readable {
+ _read(_size: number): void {
+
+ }
+}
+
+export { WorkerStream };
diff --git a/test/fixtures/async-iterator.js b/test/fixtures/async-iterator.js
new file mode 100644
index 00000000..05d6d3bc
--- /dev/null
+++ b/test/fixtures/async-iterator.js
@@ -0,0 +1,11 @@
+// eslint-disable-next-line no-eval
+module.exports = async function* ({ length = 5, throwNext = false } = {}) {
+ const median = Math.floor(length / 2);
+ for (let i = 0; i < length; i++) {
+ if (throwNext && i === median) {
+ throw new Error('Thrown error');
+ }
+
+ yield `${i}`;
+ }
+};
diff --git a/test/fixtures/bad-iterators.js b/test/fixtures/bad-iterators.js
new file mode 100644
index 00000000..e94c27a6
--- /dev/null
+++ b/test/fixtures/bad-iterators.js
@@ -0,0 +1,33 @@
+// eslint-disable-next-line no-eval
+module.exports = {
+ asyncIterator: async function* () {
+ yield Buffer.from('1');
+ yield 2;
+ },
+ asyncIterator2: async function* () {
+ yield new Int8Array([0x31]);
+ yield {}
+ },
+ asyncIterator3: async function* () {
+ yield '1';
+ yield []
+ },
+ asyncIterator4: async function* () {
+ yield new Set();
+ },
+ syncIterator: function* () {
+ yield {};
+ },
+ syncIterator2: async function* () {
+ yield new Int8Array([0x31]);
+ yield {}
+ },
+ syncIterator3: async function* () {
+ yield '1';
+ yield []
+ },
+ syncIterator4: async function* () {
+ yield Buffer.from('1');
+ yield 2;
+ },
+};
diff --git a/test/fixtures/iterator.js b/test/fixtures/iterator.js
new file mode 100644
index 00000000..c0cf969b
--- /dev/null
+++ b/test/fixtures/iterator.js
@@ -0,0 +1,11 @@
+// eslint-disable-next-line no-eval
+module.exports = function* ({ length, throwNext }) {
+ const median = Math.floor(length / 2);
+ for (let i = 0; i < length; i++) {
+ if (throwNext && i === median) {
+ throw new Error('Thrown error');
+ }
+
+ yield `${i}`;
+ }
+};
diff --git a/test/iterators.ts b/test/iterators.ts
new file mode 100644
index 00000000..0f927145
--- /dev/null
+++ b/test/iterators.ts
@@ -0,0 +1,202 @@
+import { Readable } from 'node:stream';
+import { resolve } from 'node:path';
+
+import { test } from 'tap';
+
+import Piscina from '..';
+
+test('should support iterator', (t) => {
+ const pool = new Piscina({
+ filename: resolve(__dirname, 'fixtures', 'iterator.js'),
+ });
+
+ t.plan(1);
+ pool.run({ length: 10 }).then((red: Readable) => {
+ const chunks: Buffer[] = [];
+ red.on('data', (chunk) => {
+ chunks.push(chunk);
+ });
+
+ red.on('end', () => {
+ t.equal(Buffer.concat(chunks).toString('utf-8'), '0123456789');
+ });
+ });
+});
+
+test('should handle iterator throw', (t) => {
+ const pool = new Piscina({
+ filename: resolve(__dirname, 'fixtures', 'iterator.js'),
+ });
+
+ t.plan(2);
+ pool.run({ length: 5, throwNext: true }).then((red: Readable) => {
+ const chunks: Buffer[] = [];
+ red.on('data', (chunk) => {
+ chunks.push(chunk);
+ });
+
+ red.on('error', (err) => {
+ t.equal(err.message, 'Thrown error');
+ t.equal(Buffer.concat(chunks).toString('utf-8'), '01');
+ });
+ });
+});
+
+test('should handle iterator throw (async)', (t) => {
+ const pool = new Piscina({
+ filename: resolve(__dirname, 'fixtures', 'async-iterator.js'),
+ });
+
+ t.plan(2);
+ pool.run({ length: 5, throwNext: true }).then((red: Readable) => {
+ const chunks: Buffer[] = [];
+ red.on('data', (chunk) => {
+ chunks.push(chunk);
+ });
+
+ red.on('error', (err) => {
+ t.equal(err.message, 'Thrown error');
+ t.equal(Buffer.concat(chunks).toString('utf-8'), '01');
+ });
+ });
+});
+
+test('should support async iterator', (t) => {
+ const pool = new Piscina({
+ filename: resolve(__dirname, 'fixtures', 'async-iterator.js'),
+ });
+
+ t.plan(1);
+ pool.run({ length: 10 }).then((red: Readable) => {
+ const chunks: Buffer[] = [];
+ red.on('data', (chunk) => {
+ chunks.push(chunk);
+ });
+
+ red.on('end', () => {
+ t.equal(Buffer.concat(chunks).toString('utf-8'), '0123456789');
+ });
+ });
+});
+
+test('should throw on invalid output (async)', (t) => {
+ const pool = new Piscina({
+ filename: resolve(__dirname, 'fixtures', 'bad-iterators.js'),
+ });
+
+ t.plan(7);
+ pool.run('', { name: 'asyncIterator' }).then((red: Readable) => {
+ const chunks: Buffer[] = [];
+ red.on('data', (chunk) => {
+ chunks.push(chunk);
+ });
+
+ red.on('error', (err) => {
+ t.equal(
+ err.message,
+ 'AsyncIterators should only return string, buffer or typed arrays'
+ );
+ t.equal(Buffer.concat(chunks).toString('utf-8'), '1');
+ });
+ });
+ pool.run('', { name: 'asyncIterator2' }).then((red: Readable) => {
+ const chunks: Buffer[] = [];
+ red.on('data', (chunk) => {
+ chunks.push(chunk);
+ });
+
+ red.on('error', (err) => {
+ t.equal(
+ err.message,
+ 'AsyncIterators should only return string, buffer or typed arrays'
+ );
+ t.equal(Buffer.concat(chunks).toString('utf-8'), '1');
+ });
+ });
+ pool.run('', { name: 'asyncIterator3' }).then((red: Readable) => {
+ const chunks: Buffer[] = [];
+ red.on('data', (chunk) => {
+ chunks.push(chunk);
+ });
+
+ red.on('error', (err) => {
+ t.equal(
+ err.message,
+ 'AsyncIterators should only return string, buffer or typed arrays'
+ );
+ t.equal(Buffer.concat(chunks).toString('utf-8'), '1');
+ });
+ });
+ pool.run('', { name: 'asyncIterator4' }).then(
+ () => {
+ t.fail('should not succeed');
+ },
+ (err) => {
+ t.equal(
+ err.message,
+ 'AsyncIterators should only return string, buffer or typed arrays'
+ );
+ }
+ );
+});
+
+test('should throw on invalid output', (t) => {
+ const pool = new Piscina({
+ filename: resolve(__dirname, 'fixtures', 'bad-iterators.js'),
+ });
+
+ t.plan(7);
+ pool.run('', { name: 'syncIterator' }).then(
+ () => {
+ t.fail('should not succeed');
+ },
+ (err) => {
+ t.equal(
+ err.message,
+ 'AsyncIterators should only return string, buffer or typed arrays'
+ );
+ }
+ );
+ pool.run('', { name: 'syncIterator2' }).then((red: Readable) => {
+ const chunks: Buffer[] = [];
+ red.on('data', (chunk) => {
+ chunks.push(chunk);
+ });
+
+ red.on('error', (err) => {
+ t.equal(
+ err.message,
+ 'AsyncIterators should only return string, buffer or typed arrays'
+ );
+ t.equal(Buffer.concat(chunks).toString('utf-8'), '1');
+ });
+ });
+ pool.run('', { name: 'syncIterator3' }).then((red: Readable) => {
+ const chunks: Buffer[] = [];
+ red.on('data', (chunk) => {
+ chunks.push(chunk);
+ });
+
+ red.on('error', (err) => {
+ t.equal(
+ err.message,
+ 'AsyncIterators should only return string, buffer or typed arrays'
+ );
+ t.equal(Buffer.concat(chunks).toString('utf-8'), '1');
+ });
+ });
+ pool.run('', { name: 'syncIterator4' }).then((red: Readable) => {
+ const chunks: Buffer[] = [];
+ red.on('data', (chunk) => {
+ chunks.push(chunk);
+ });
+
+ red.on('error', (err) => {
+ t.equal(
+ err.message,
+ 'AsyncIterators should only return string, buffer or typed arrays'
+ );
+ t.equal(Buffer.concat(chunks).toString('utf-8'), '1');
+ });
+ });
+});
diff --git a/tsconfig.json b/tsconfig.json
index 1cb3ed5f..c5df8ee3 100644
--- a/tsconfig.json
+++ b/tsconfig.json
@@ -1,6 +1,7 @@
{
"compilerOptions": {
- "target": "es2019",
+ "incremental": true,
+ "target": "ES2022",
"module": "commonjs",
"moduleResolution": "node",
"lib": ["es2019"],