-
-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Break up request graph cache serialisation and run after build completion #9384
Changes from 9 commits
c3fd8f2
c06fa33
863b5bd
d93e422
63f0017
0cfdedb
d1e726d
b82ab4c
07302ca
80507da
730c274
e5d8565
51ffd03
84d0e1d
ed110a1
48f1d25
96c1abc
c88cab5
921c7ba
6d339a6
3965abb
e0b3ba2
3d8b1e3
0941434
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -105,25 +105,34 @@ export class FSCache implements Cache { | |
async setLargeBlob(key: string, contents: Buffer | string): Promise<void> { | ||
const chunks = Math.ceil(contents.length / WRITE_LIMIT_CHUNK); | ||
|
||
const writePromises: Promise<void>[] = []; | ||
if (chunks === 1) { | ||
// If there's one chunk, don't slice the content | ||
await this.fs.writeFile(this.#getFilePath(key, 0), contents); | ||
return; | ||
} | ||
|
||
const writePromises: Promise<void>[] = []; | ||
for (let i = 0; i < chunks; i += 1) { | ||
writePromises.push( | ||
this.fs.writeFile( | ||
this.#getFilePath(key, i), | ||
typeof contents === 'string' | ||
? contents.slice(i * WRITE_LIMIT_CHUNK, (i + 1) * WRITE_LIMIT_CHUNK) | ||
: contents.subarray( | ||
i * WRITE_LIMIT_CHUNK, | ||
(i + 1) * WRITE_LIMIT_CHUNK, | ||
), | ||
), | ||
this.fs.writeFile(this.#getFilePath(key, 0), contents), | ||
); | ||
} else { | ||
for (let i = 0; i < chunks; i += 1) { | ||
writePromises.push( | ||
this.fs.writeFile( | ||
this.#getFilePath(key, i), | ||
typeof contents === 'string' | ||
? contents.slice( | ||
i * WRITE_LIMIT_CHUNK, | ||
(i + 1) * WRITE_LIMIT_CHUNK, | ||
) | ||
: contents.subarray( | ||
i * WRITE_LIMIT_CHUNK, | ||
(i + 1) * WRITE_LIMIT_CHUNK, | ||
), | ||
), | ||
); | ||
} | ||
} | ||
|
||
// If there's already a file following this chunk, it's old and should be removed | ||
if (await this.fs.exists(this.#getFilePath(key, chunks))) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that async existence checking is long deprecated in Node, and can lead to race conditions: https://nodejs.org/api/fs.html#fsexistspath-callback (i.e. the recommended approach would be to just try and |
||
writePromises.push(this.fs.unlink(this.#getFilePath(key, chunks))); | ||
} | ||
|
||
await Promise.all(writePromises); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ import type {FilePath} from '@parcel/types'; | |
import type {Cache} from './types'; | ||
import type {Readable, Writable} from 'stream'; | ||
|
||
import type {AbortSignal} from 'abortcontroller-polyfill/dist/cjs-ponyfill'; | ||
import stream from 'stream'; | ||
import path from 'path'; | ||
import {promisify} from 'util'; | ||
|
@@ -111,27 +112,47 @@ export class LMDBCache implements Cache { | |
return Buffer.concat(await Promise.all(buffers)); | ||
} | ||
|
||
async setLargeBlob(key: string, contents: Buffer | string): Promise<void> { | ||
async setLargeBlob( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any elegant way to share the implementation between the two Cache types as (AFAICT) they're identical? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instantiate a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That sounds good to me, will clean this up |
||
key: string, | ||
contents: Buffer | string, | ||
options?: {|signal?: AbortSignal|}, | ||
): Promise<void> { | ||
const chunks = Math.ceil(contents.length / WRITE_LIMIT_CHUNK); | ||
|
||
const writePromises: Promise<void>[] = []; | ||
if (chunks === 1) { | ||
// If there's one chunk, don't slice the content | ||
await this.fs.writeFile(this.#getFilePath(key, 0), contents); | ||
return; | ||
writePromises.push( | ||
this.fs.writeFile(this.#getFilePath(key, 0), contents, { | ||
signal: options?.signal, | ||
}), | ||
); | ||
} else { | ||
for (let i = 0; i < chunks; i += 1) { | ||
writePromises.push( | ||
this.fs.writeFile( | ||
this.#getFilePath(key, i), | ||
typeof contents === 'string' | ||
? contents.slice( | ||
i * WRITE_LIMIT_CHUNK, | ||
(i + 1) * WRITE_LIMIT_CHUNK, | ||
) | ||
: contents.subarray( | ||
i * WRITE_LIMIT_CHUNK, | ||
(i + 1) * WRITE_LIMIT_CHUNK, | ||
), | ||
{signal: options?.signal}, | ||
), | ||
); | ||
} | ||
} | ||
|
||
const writePromises: Promise<void>[] = []; | ||
for (let i = 0; i < chunks; i += 1) { | ||
// If there's already a file following this chunk, it's old and should be removed | ||
if (await this.fs.exists(this.#getFilePath(key, chunks))) { | ||
writePromises.push( | ||
this.fs.writeFile( | ||
this.#getFilePath(key, i), | ||
typeof contents === 'string' | ||
? contents.slice(i * WRITE_LIMIT_CHUNK, (i + 1) * WRITE_LIMIT_CHUNK) | ||
: contents.subarray( | ||
i * WRITE_LIMIT_CHUNK, | ||
(i + 1) * WRITE_LIMIT_CHUNK, | ||
), | ||
), | ||
this.fs.unlink(this.#getFilePath(key, chunks), { | ||
signal: options?.signal, | ||
}), | ||
); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -846,6 +846,8 @@ export class RequestGraph extends ContentGraph< | |
} | ||
} | ||
|
||
const NODES_PER_BLOB = 2 ** 14; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How did we arrive at this number? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I profiled locally on my machine on how long it took to serialise I'll document this on the constant so it can be tuned in the future if required. |
||
|
||
export default class RequestTracker { | ||
graph: RequestGraph; | ||
farm: WorkerFarm; | ||
|
@@ -1108,55 +1110,88 @@ export default class RequestTracker { | |
return {api, subRequestContentKeys}; | ||
} | ||
|
||
async writeToCache() { | ||
let cacheKey = getCacheKey(this.options); | ||
let requestGraphKey = hashString(`${cacheKey}:requestGraph`); | ||
let snapshotKey = hashString(`${cacheKey}:snapshot`); | ||
async writeToCache(signal?: AbortSignal) { | ||
const cacheKey = getCacheKey(this.options); | ||
const requestGraphKey = `requestGraph-${hashString(cacheKey)}`; | ||
const snapshotKey = `snapshot-${hashString(cacheKey)}`; | ||
|
||
if (this.options.shouldDisableCache) { | ||
return; | ||
} | ||
let total = 2; | ||
|
||
const serialisedGraph = this.graph.serialize(); | ||
|
||
let total = 0; | ||
const serialiseAndSet = async ( | ||
key: string, | ||
// $FlowFixMe serialise input is any type | ||
contents: any, | ||
): Promise<void> => { | ||
if (signal?.aborted) { | ||
throw new Error('Serialization was aborted'); | ||
} | ||
|
||
await this.options.cache.setLargeBlob( | ||
key, | ||
serialize(contents), | ||
signal | ||
? { | ||
signal: signal, | ||
} | ||
: undefined, | ||
); | ||
|
||
total += 1; | ||
|
||
report({ | ||
type: 'cache', | ||
phase: 'write', | ||
total, | ||
size: this.graph.nodes.length, | ||
}); | ||
}; | ||
|
||
const promises: Promise<void>[] = []; | ||
|
||
report({ | ||
type: 'cache', | ||
phase: 'start', | ||
total, | ||
size: this.graph.nodes.length, | ||
}); | ||
let promises = []; | ||
for (let node of this.graph.nodes) { | ||
if (!node || node.type !== REQUEST) { | ||
continue; | ||
} | ||
|
||
let resultCacheKey = node.resultCacheKey; | ||
if (resultCacheKey != null && node.result != null) { | ||
promises.push( | ||
this.options.cache.setLargeBlob( | ||
resultCacheKey, | ||
serialize(node.result), | ||
), | ||
); | ||
total++; | ||
report({ | ||
type: 'cache', | ||
phase: 'write', | ||
total, | ||
size: this.graph.nodes.length, | ||
}); | ||
delete node.result; | ||
// Preallocating a sparse array is faster than pushing when N is high enough | ||
const cacheableNodes = new Array(serialisedGraph.nodes.length); | ||
for (let i = 0; i < serialisedGraph.nodes.length; i += 1) { | ||
const node = serialisedGraph.nodes[i]; | ||
|
||
let resultCacheKey = node?.resultCacheKey; | ||
if ( | ||
node?.type === REQUEST && | ||
resultCacheKey != null && | ||
node?.result != null | ||
) { | ||
promises.push(serialiseAndSet(resultCacheKey, node.result)); | ||
// eslint-disable-next-line no-unused-vars | ||
const {result: _, ...newNode} = node; | ||
cacheableNodes[i] = newNode; | ||
} else { | ||
cacheableNodes[i] = node; | ||
} | ||
} | ||
|
||
for (let i = 0; i * NODES_PER_BLOB < cacheableNodes.length; i += 1) { | ||
promises.push( | ||
serialiseAndSet( | ||
`requestGraph-nodes-${i}-${hashString(cacheKey)}`, | ||
cacheableNodes.slice(i * NODES_PER_BLOB, (i + 1) * NODES_PER_BLOB), | ||
), | ||
); | ||
} | ||
|
||
promises.push( | ||
this.options.cache.setLargeBlob(requestGraphKey, serialize(this.graph)), | ||
serialiseAndSet(requestGraphKey, {...serialisedGraph, nodes: undefined}), | ||
); | ||
report({ | ||
type: 'cache', | ||
phase: 'write', | ||
total, | ||
size: this.graph.nodes.length, | ||
}); | ||
|
||
let opts = getWatcherOptions(this.options); | ||
let snapshotPath = path.join(this.options.cacheDir, snapshotKey + '.txt'); | ||
|
@@ -1167,15 +1202,10 @@ export default class RequestTracker { | |
opts, | ||
), | ||
); | ||
report({ | ||
type: 'cache', | ||
phase: 'write', | ||
total, | ||
size: this.graph.nodes.length, | ||
}); | ||
report({type: 'cache', phase: 'end', total, size: this.graph.nodes.length}); | ||
|
||
await Promise.all(promises); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How many concurrent promises will be typical here for a Jira cache? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (We also have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ah, yeah, it's even touched in this PR 😅 I was thinking of Jira where I have used |
||
|
||
report({type: 'cache', phase: 'end', total, size: this.graph.nodes.length}); | ||
} | ||
|
||
static async init({ | ||
|
@@ -1205,20 +1235,42 @@ async function loadRequestGraph(options): Async<RequestGraph> { | |
return new RequestGraph(); | ||
} | ||
|
||
let cacheKey = getCacheKey(options); | ||
let requestGraphKey = hashString(`${cacheKey}:requestGraph`); | ||
const cacheKey = getCacheKey(options); | ||
const hashedCacheKey = hashString(cacheKey); | ||
const requestGraphKey = `requestGraph-${hashedCacheKey}`; | ||
if (await options.cache.hasLargeBlob(requestGraphKey)) { | ||
let requestGraph: RequestGraph = deserialize( | ||
await options.cache.getLargeBlob(requestGraphKey), | ||
); | ||
const getAndDeserialize = async (key: string) => { | ||
return deserialize(await options.cache.getLargeBlob(key)); | ||
}; | ||
|
||
let i = 0; | ||
let nodePromises = []; | ||
while ( | ||
await options.cache.hasLargeBlob( | ||
`requestGraph-nodes-${i}-${hashedCacheKey}`, | ||
) | ||
) { | ||
nodePromises.push( | ||
getAndDeserialize(`requestGraph-nodes-${i}-${hashedCacheKey}`), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We generate this string at least 3 different times. Could move it to a function so it's clear they're tied together? |
||
); | ||
i += 1; | ||
} | ||
|
||
const serializedRequestGraph = await getAndDeserialize(requestGraphKey); | ||
const requestGraph = RequestGraph.deserialize({ | ||
...serializedRequestGraph, | ||
nodes: (await Promise.all(nodePromises)).flatMap(nodeChunk => nodeChunk), | ||
}); | ||
|
||
let opts = getWatcherOptions(options); | ||
let snapshotKey = hashString(`${cacheKey}:snapshot`); | ||
let snapshotKey = `snapshot-${hashedCacheKey}`; | ||
let snapshotPath = path.join(options.cacheDir, snapshotKey + '.txt'); | ||
let events = await options.inputFS.getEventsSince( | ||
options.projectRoot, | ||
snapshotPath, | ||
opts, | ||
); | ||
|
||
requestGraph.invalidateUnpredictableNodes(); | ||
requestGraph.invalidateOnBuildNodes(); | ||
requestGraph.invalidateEnvNodes(options.env); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it matter if it had extra chunks after that, or does that just become "garbage"?
i.e. if you had chunks 0, 1, 2, 3 (unlikely given the sizes - but still..), and next time have 0,1, you'll delete 2 but leave 3 dangling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I left it dangling as the intention was purely to cut the edge case of joining two different chunks together. In retrospect, it's not complex to just delete everything so I'll add that