Skip to content

Commit 6e5cbc9

Browse files
RaishavHanspalsindresorhus
andauthoredJan 22, 2025··
Add .setPriority() for updating priority of a queued promise function (#209)
Co-authored-by: Sindre Sorhus <sindresorhus@gmail.com>
1 parent cbdbbb7 commit 6e5cbc9

File tree

6 files changed

+270
-2
lines changed

6 files changed

+270
-2
lines changed
 

‎readme.md

+44
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ Default: `0`
139139

140140
Priority of operation. Operations with greater priority will be scheduled first.
141141

142+
##### id
143+
144+
Type `string`
145+
146+
Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from `1n`.
147+
142148
##### signal
143149

144150
[`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) for cancellation of the operation. When aborted, it will be removed from the queue and the `queue.add()` call will reject with an [error](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/reason). If the operation is already running, the signal will need to be handled by the operation itself.
@@ -238,6 +244,44 @@ console.log(queue.sizeBy({priority: 0}));
238244
//=> 1
239245
```
240246

247+
#### .setPriority(id, priority)
248+
249+
Updates the priority of a promise function by its id, affecting its execution order. Requires a defined concurrency limit to take effect.
250+
251+
For example, this can be used to prioritize a promise function to run earlier.
252+
253+
```js
254+
import PQueue from 'p-queue';
255+
256+
const queue = new PQueue({concurrency: 1});
257+
258+
queue.add(async () => '🦄', {priority: 1});
259+
queue.add(async () => '🦀', {priority: 0, id: '🦀'});
260+
queue.add(async () => '🦄', {priority: 1});
261+
queue.add(async () => '🦄', {priority: 1});
262+
263+
queue.setPriority('🦀', 2);
264+
```
265+
266+
In this case, the promise function with `id: '🦀'` runs second.
267+
268+
You can also deprioritize a promise function to delay its execution:
269+
270+
```js
271+
import PQueue from 'p-queue';
272+
273+
const queue = new PQueue({concurrency: 1});
274+
275+
queue.add(async () => '🦄', {priority: 1});
276+
queue.add(async () => '🦀', {priority: 1, id: '🦀'});
277+
queue.add(async () => '🦄');
278+
queue.add(async () => '🦄', {priority: 0});
279+
280+
queue.setPriority('🦀', -1);
281+
```
282+
283+
Here, the promise function with `id: '🦀'` executes last.
284+
241285
#### .pending
242286

243287
Number of running items (no longer in the queue).

‎source/index.ts

+46
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
4343

4444
readonly #throwOnTimeout: boolean;
4545

46+
// Use to assign a unique identifier to a promise function, if not explicitly specified
47+
#idAssigner = 1n;
48+
4649
/**
4750
Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already.
4851
@@ -228,12 +231,55 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
228231
});
229232
}
230233

234+
/**
235+
Updates the priority of a promise function by its id, affecting its execution order. Requires a defined concurrency limit to take effect.
236+
237+
For example, this can be used to prioritize a promise function to run earlier.
238+
239+
```js
240+
import PQueue from 'p-queue';
241+
242+
const queue = new PQueue({concurrency: 1});
243+
244+
queue.add(async () => '🦄', {priority: 1});
245+
queue.add(async () => '🦀', {priority: 0, id: '🦀'});
246+
queue.add(async () => '🦄', {priority: 1});
247+
queue.add(async () => '🦄', {priority: 1});
248+
249+
queue.setPriority('🦀', 2);
250+
```
251+
252+
In this case, the promise function with `id: '🦀'` runs second.
253+
254+
You can also deprioritize a promise function to delay its execution:
255+
256+
```js
257+
import PQueue from 'p-queue';
258+
259+
const queue = new PQueue({concurrency: 1});
260+
261+
queue.add(async () => '🦄', {priority: 1});
262+
queue.add(async () => '🦀', {priority: 1, id: '🦀'});
263+
queue.add(async () => '🦄');
264+
queue.add(async () => '🦄', {priority: 0});
265+
266+
queue.setPriority('🦀', -1);
267+
```
268+
Here, the promise function with `id: '🦀'` executes last.
269+
*/
270+
setPriority(id: string, priority: number) {
271+
this.#queue.setPriority(id, priority);
272+
}
273+
231274
/**
232275
Adds a sync or async task to the queue. Always returns a promise.
233276
*/
234277
async add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>;
235278
async add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>;
236279
async add<TaskResultType>(function_: Task<TaskResultType>, options: Partial<EnqueueOptionsType> = {}): Promise<TaskResultType | void> {
280+
// In case `id` is not defined.
281+
options.id ??= (this.#idAssigner++).toString();
282+
237283
options = {
238284
timeout: this.timeout,
239285
throwOnTimeout: this.#throwOnTimeout,

‎source/options.ts

+5
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ export type QueueAddOptions = {
6969
@default 0
7070
*/
7171
readonly priority?: number;
72+
73+
/**
74+
Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from `1n`.
75+
*/
76+
id?: string;
7277
} & TaskOptions & TimeoutOptions;
7378

7479
export type TaskOptions = {

‎source/priority-queue.ts

+12-1
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp
1717

1818
const element = {
1919
priority: options.priority,
20+
id: options.id,
2021
run,
2122
};
2223

23-
if (this.size && this.#queue[this.size - 1]!.priority! >= options.priority!) {
24+
if (this.size === 0 || this.#queue[this.size - 1]!.priority! >= options.priority!) {
2425
this.#queue.push(element);
2526
return;
2627
}
@@ -32,6 +33,16 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp
3233
this.#queue.splice(index, 0, element);
3334
}
3435

36+
setPriority(id: string, priority: number) {
37+
const index: number = this.#queue.findIndex((element: Readonly<PriorityQueueOptions>) => element.id === id);
38+
if (index === -1) {
39+
throw new ReferenceError(`No promise function with the id "${id}" exists in the queue.`);
40+
}
41+
42+
const [item] = this.#queue.splice(index, 1);
43+
this.enqueue(item!.run, {priority, id});
44+
}
45+
3546
dequeue(): RunFunction | undefined {
3647
const item = this.#queue.shift();
3748
return item?.run;

‎source/queue.ts

+1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ export type Queue<Element, Options> = {
55
filter: (options: Readonly<Partial<Options>>) => Element[];
66
dequeue: () => Element | undefined;
77
enqueue: (run: Element, options?: Partial<Options>) => void;
8+
setPriority: (id: string, priority: number) => void;
89
};

‎test/test.ts

+162-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import inRange from 'in-range';
66
import timeSpan from 'time-span';
77
import randomInt from 'random-int';
88
import pDefer from 'p-defer';
9-
import PQueue, {AbortError} from '../source/index.js';
9+
import PQueue from '../source/index.js';
1010

1111
const fixture = Symbol('fixture');
1212

@@ -1134,3 +1134,164 @@ test('aborting multiple jobs at the same time', async t => {
11341134
await t.throwsAsync(task2, {instanceOf: DOMException});
11351135
t.like(queue, {size: 0, pending: 0});
11361136
});
1137+
1138+
test('.setPriority() - execute a promise before planned', async t => {
1139+
const result: string[] = [];
1140+
const queue = new PQueue({concurrency: 1});
1141+
queue.add(async () => {
1142+
await delay(400);
1143+
result.push('🐌');
1144+
}, {id: '🐌'});
1145+
queue.add(async () => {
1146+
await delay(400);
1147+
result.push('🦆');
1148+
}, {id: '🦆'});
1149+
queue.add(async () => {
1150+
await delay(400);
1151+
result.push('🐢');
1152+
}, {id: '🐢'});
1153+
queue.setPriority('🐢', 1);
1154+
await queue.onIdle();
1155+
t.deepEqual(result, ['🐌', '🐢', '🦆']);
1156+
});
1157+
1158+
test('.setPriority() - execute a promise after planned', async t => {
1159+
const result: string[] = [];
1160+
const queue = new PQueue({concurrency: 1});
1161+
queue.add(async () => {
1162+
await delay(400);
1163+
result.push('🐌');
1164+
}, {id: '🐌'});
1165+
queue.add(async () => {
1166+
await delay(400);
1167+
result.push('🦆');
1168+
}, {id: '🦆'});
1169+
queue.add(async () => {
1170+
await delay(400);
1171+
result.push('🦆');
1172+
}, {id: '🦆'});
1173+
queue.add(async () => {
1174+
await delay(400);
1175+
result.push('🐢');
1176+
}, {id: '🐢'});
1177+
queue.add(async () => {
1178+
await delay(400);
1179+
result.push('🦆');
1180+
}, {id: '🦆'});
1181+
queue.add(async () => {
1182+
await delay(400);
1183+
result.push('🦆');
1184+
}, {id: '🦆'});
1185+
queue.setPriority('🐢', -1);
1186+
await queue.onIdle();
1187+
t.deepEqual(result, ['🐌', '🦆', '🦆', '🦆', '🦆', '🐢']);
1188+
});
1189+
1190+
test('.setPriority() - execute a promise before planned - concurrency 2', async t => {
1191+
const result: string[] = [];
1192+
const queue = new PQueue({concurrency: 2});
1193+
queue.add(async () => {
1194+
await delay(400);
1195+
result.push('🐌');
1196+
}, {id: '🐌'});
1197+
queue.add(async () => {
1198+
await delay(400);
1199+
result.push('🦆');
1200+
}, {id: '🦆'});
1201+
queue.add(async () => {
1202+
await delay(400);
1203+
result.push('🐢');
1204+
}, {id: '🐢'});
1205+
queue.add(async () => {
1206+
await delay(400);
1207+
result.push('⚡️');
1208+
}, {id: '⚡️'});
1209+
queue.setPriority('⚡️', 1);
1210+
await queue.onIdle();
1211+
t.deepEqual(result, ['🐌', '🦆', '⚡️', '🐢']);
1212+
});
1213+
1214+
test('.setPriority() - execute a promise before planned - concurrency 3', async t => {
1215+
const result: string[] = [];
1216+
const queue = new PQueue({concurrency: 3});
1217+
queue.add(async () => {
1218+
await delay(400);
1219+
result.push('🐌');
1220+
}, {id: '🐌'});
1221+
queue.add(async () => {
1222+
await delay(400);
1223+
result.push('🦆');
1224+
}, {id: '🦆'});
1225+
queue.add(async () => {
1226+
await delay(400);
1227+
result.push('🐢');
1228+
}, {id: '🐢'});
1229+
queue.add(async () => {
1230+
await delay(400);
1231+
result.push('⚡️');
1232+
}, {id: '⚡️'});
1233+
queue.add(async () => {
1234+
await delay(400);
1235+
result.push('🦀');
1236+
}, {id: '🦀'});
1237+
queue.setPriority('🦀', 1);
1238+
await queue.onIdle();
1239+
t.deepEqual(result, ['🐌', '🦆', '🐢', '🦀', '⚡️']);
1240+
});
1241+
1242+
test('.setPriority() - execute a multiple promise before planned, with variable priority', async t => {
1243+
const result: string[] = [];
1244+
const queue = new PQueue({concurrency: 2});
1245+
queue.add(async () => {
1246+
await delay(400);
1247+
result.push('🐌');
1248+
}, {id: '🐌'});
1249+
queue.add(async () => {
1250+
await delay(400);
1251+
result.push('🦆');
1252+
}, {id: '🦆'});
1253+
queue.add(async () => {
1254+
await delay(400);
1255+
result.push('🐢');
1256+
}, {id: '🐢'});
1257+
queue.add(async () => {
1258+
await delay(400);
1259+
result.push('⚡️');
1260+
}, {id: '⚡️'});
1261+
queue.add(async () => {
1262+
await delay(400);
1263+
result.push('🦀');
1264+
}, {id: '🦀'});
1265+
queue.setPriority('⚡️', 1);
1266+
queue.setPriority('🦀', 2);
1267+
await queue.onIdle();
1268+
t.deepEqual(result, ['🐌', '🦆', '🦀', '⚡️', '🐢']);
1269+
});
1270+
1271+
test('.setPriority() - execute a promise before planned - concurrency 3 and unspecified `id`', async t => {
1272+
const result: string[] = [];
1273+
const queue = new PQueue({concurrency: 3});
1274+
queue.add(async () => {
1275+
await delay(400);
1276+
result.push('🐌');
1277+
});
1278+
queue.add(async () => {
1279+
await delay(400);
1280+
result.push('🦆');
1281+
});
1282+
queue.add(async () => {
1283+
await delay(400);
1284+
result.push('🐢');
1285+
});
1286+
queue.add(async () => {
1287+
await delay(400);
1288+
result.push('⚡️');
1289+
});
1290+
queue.add(async () => {
1291+
await delay(400);
1292+
result.push('🦀');
1293+
});
1294+
queue.setPriority('5', 1);
1295+
await queue.onIdle();
1296+
t.deepEqual(result, ['🐌', '🦆', '🐢', '🦀', '⚡️']);
1297+
});

0 commit comments

Comments
 (0)
Please sign in to comment.