Skip to content

Commit

Permalink
Move cache aborting to build queue
Browse files Browse the repository at this point in the history
  • Loading branch information
JakeLane committed Nov 17, 2023
1 parent 863b5bd commit 858efed
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 41 deletions.
30 changes: 27 additions & 3 deletions packages/core/core/src/Parcel.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ export default class Parcel {
let result = await this._build({startTime});
await this._end();

await this.#requestTracker.writeToCache();

if (result.type === 'buildFailure') {
throw new BuildError(result.diagnostics);
}
Expand All @@ -178,6 +180,28 @@ export default class Parcel {
await this.#disposable.dispose();
}

async writeRequestTrackerToCache(): Promise<void> {
if (this.#watchQueue.getNumWaiting() === 0) {
// If there's no queued events, we are safe to write the request graph to disk
const abortController = new AbortController();

const unsubscribe = this.#watchQueue.subscribeToAdd(() => {
abortController.abort();
});

try {
await this.#requestTracker.writeToCache(abortController.signal);
} catch (err) {
if (!abortController.signal.aborted) {
// We expect abort errors if we interrupt the cache write
throw err;
}
}

unsubscribe();
}
}

async _startNextBuild(): Promise<?BuildEvent> {
this.#watchAbortController = new AbortController();
await this.#farm.callAllWorkers('clearConfigCache', []);
Expand All @@ -197,6 +221,9 @@ export default class Parcel {
if (!(err instanceof BuildAbortError)) {
throw err;
}
} finally {
// If the build passes or fails, we want to cache the request graph
await this.writeRequestTrackerToCache();
}
}

Expand Down Expand Up @@ -358,7 +385,6 @@ export default class Parcel {
createValidationRequest({optionsRef: this.#optionsRef, assetRequests}),
{force: assetRequests.length > 0},
);
await this.#requestTracker.writeToCache();
return event;
} catch (e) {
if (e instanceof BuildAbortError) {
Expand All @@ -372,8 +398,6 @@ export default class Parcel {
};

await this.#reporterRunner.report(event);
await this.#requestTracker.writeToCache();

return event;
} finally {
if (this.isProfiling) {
Expand Down
31 changes: 15 additions & 16 deletions packages/core/core/src/RequestTracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ export default class RequestTracker {
return {api, subRequestContentKeys};
}

async writeToCache() {
async writeToCache(signal?: AbortSignal) {
const cacheKey = getCacheKey(this.options);
const requestGraphKey = `requestGraph:${hashString(cacheKey)}`;
const snapshotKey = `snapshot:${hashString(cacheKey)}`;
Expand All @@ -1115,17 +1115,19 @@ export default class RequestTracker {
// $FlowFixMe serialise input is any type
contents: any,
): Promise<void> => {
if (!this.signal?.aborted) {
await this.options.cache.setLargeBlob(
key,
serialize(contents),
this.signal
? {
signal: this.signal,
}
: undefined,
);
if (signal?.aborted) {
throw new Error('Serialization was aborted');
}

await this.options.cache.setLargeBlob(
key,
serialize(contents),
signal
? {
signal: signal,
}
: undefined,
);
};

const serialisedGraph = this.graph.serialize();
Expand All @@ -1151,14 +1153,11 @@ export default class RequestTracker {
}
}

for (let i = 0; i * NODES_PER_BLOB < serialisedGraph.nodes.length; i += 1) {
for (let i = 0; i * NODES_PER_BLOB < cacheableNodes.length; i += 1) {
promises.push(
serialiseAndSet(
`requestGraph:nodes:${i}:${hashString(cacheKey)}`,
serialisedGraph.nodes.slice(
i * NODES_PER_BLOB,
(i + 1) * NODES_PER_BLOB,
),
cacheableNodes.slice(i * NODES_PER_BLOB, (i + 1) * NODES_PER_BLOB),
),
);
}
Expand Down
27 changes: 5 additions & 22 deletions packages/core/core/test/RequestTracker.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// @flow strict-local

import assert from 'assert';
import {AbortController} from 'abortcontroller-polyfill/dist/cjs-ponyfill';
import nullthrows from 'nullthrows';
import RequestTracker, {type RunAPI} from '../src/RequestTracker';
import WorkerFarm from '@parcel/workers';
Expand Down Expand Up @@ -187,30 +188,12 @@ describe('RequestTracker', () => {

it('should stop writing to cache when the abort controller aborts', async () => {
let tracker = new RequestTracker({farm, options});
let p = tracker
.runRequest({
id: 'abc',
type: 'path_request',
run: async () => {
await Promise.resolve('hello');
},
input: null,
})
.then(null, () => {
/* do nothing */
});
await p;
// $FlowFixMe
tracker.setSignal({aborted: true});

// $FlowFixMe[incompatible-cast]
const fsEvents = (options.outputFS: MemoryFS).events;
const initialCount = fsEvents.length;

await tracker.writeToCache();
const abortController = new AbortController();
abortController.abort();

// Caching writes to the FS, if there's more events than before - we haven't aborted correctly
assert.deepEqual([], fsEvents.slice(initialCount));
// $FlowFixMe[prop-missing] Rejects is missing on
await assert.rejects(tracker.writeToCache(abortController.signal));
});

it('should not requeue requests if the previous request is still running', async () => {
Expand Down
13 changes: 13 additions & 0 deletions packages/core/utils/src/PromiseQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export default class PromiseQueue<T> {
_error: mixed;
_count: number = 0;
_results: Array<T> = [];
_addSubscriptions: Set<() => void> = new Set();

constructor(opts: PromiseQueueOpts = {maxConcurrent: Infinity}) {
if (opts.maxConcurrent <= 0) {
Expand Down Expand Up @@ -43,12 +44,24 @@ export default class PromiseQueue<T> {

this._queue.push(wrapped);

for (const addFn of this._addSubscriptions) {
addFn();
}

if (this._numRunning > 0 && this._numRunning < this._maxConcurrent) {
this._next();
}
});
}

subscribeToAdd(fn: () => void): () => void {
this._addSubscriptions.add(fn);

return () => {
this._addSubscriptions.delete(fn);
};
}

run(): Promise<Array<T>> {
if (this._runPromise != null) {
return this._runPromise;
Expand Down
28 changes: 28 additions & 0 deletions packages/core/utils/test/PromiseQueue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import assert from 'assert';
import randomInt from 'random-int';

import PromiseQueue from '../src/PromiseQueue';
import sinon from 'sinon';

describe('PromiseQueue', () => {
it('run() should resolve when all async functions in queue have completed', async () => {
Expand Down Expand Up @@ -72,4 +73,31 @@ describe('PromiseQueue', () => {

await queue.run();
});

it('.add() should notify subscribers', async () => {
const queue = new PromiseQueue();

const subscribedFn = sinon.spy();
queue.subscribeToAdd(subscribedFn);

const promise = queue.add(() => Promise.resolve());
await queue.run();
await promise;

assert(subscribedFn.called);
});

it('.subscribeToAdd() should allow unsubscribing', async () => {
const queue = new PromiseQueue();

const subscribedFn = sinon.spy();
const unsubscribe = queue.subscribeToAdd(subscribedFn);
unsubscribe();

const promise = queue.add(() => Promise.resolve());
await queue.run();
await promise;

assert(!subscribedFn.called);
});
});

0 comments on commit 858efed

Please sign in to comment.