@@ -18,6 +18,7 @@ const removeTrailingSlashes = require('./util/trailing-slashes.js')
18
18
const getContents = require ( '@npmcli/installed-package-contents' )
19
19
const readPackageJsonFast = require ( 'read-package-json-fast' )
20
20
const readPackageJson = promisify ( require ( 'read-package-json' ) )
21
+ const Minipass = require ( 'minipass' )
21
22
22
23
// we only change ownership on unix platforms, and only if uid is 0
23
24
const selfOwner = process . getuid && process . getuid ( ) === 0 ? {
@@ -219,40 +220,42 @@ class FetcherBase {
219
220
}
220
221
221
222
[ _istream ] ( stream ) {
222
- // everyone will need one of these, either for verifying or calculating
223
- // We always set it, because we have might only have a weak legacy hex
224
- // sha1 in the packument, and this MAY upgrade it to a stronger algo.
225
- // If we had an integrity, and it doesn't match, then this does not
226
- // override that error; the istream will raise the error before it
227
- // gets to the point of re-setting the integrity.
228
- const istream = ssri . integrityStream ( this . opts )
229
- istream . on ( 'integrity' , i => this . integrity = i )
230
- stream . on ( 'error' , er => istream . emit ( 'error' , er ) )
231
-
232
- // if not caching this, just pipe through to the istream and return it
223
+ // if not caching this, just return it
233
224
if ( ! this . opts . cache || ! this [ _cacheFetches ] ) {
225
+ // instead of creating a new integrity stream, we only piggyback on the
226
+ // provided stream's events
227
+ if ( stream . hasIntegrityEmitter ) {
228
+ stream . on ( 'integrity' , i => this . integrity = i )
229
+ return stream
230
+ }
231
+
232
+ const istream = ssri . integrityStream ( this . opts )
233
+ istream . on ( 'integrity' , i => this . integrity = i )
234
+ stream . on ( 'error' , err => istream . emit ( 'error' , err ) )
234
235
return stream . pipe ( istream )
235
236
}
236
237
237
238
// we have to return a stream that gets ALL the data, and proxies errors,
238
239
// but then pipe from the original tarball stream into the cache as well.
239
240
// To do this without losing any data, and since the cacache put stream
240
241
// is not a passthrough, we have to pipe from the original stream into
241
- // the cache AFTER we pipe into the istream . Since the cache stream
242
+ // the cache AFTER we pipe into the middleStream . Since the cache stream
242
243
// has an asynchronous flush to write its contents to disk, we need to
243
- // defer the istream end until the cache stream ends.
244
- stream . pipe ( istream , { end : false } )
244
+ // defer the middleStream end until the cache stream ends.
245
+ const middleStream = new Minipass ( )
246
+ stream . on ( 'error' , err => middleStream . emit ( 'error' , err ) )
247
+ stream . pipe ( middleStream , { end : false } )
245
248
const cstream = cacache . put . stream (
246
249
this . opts . cache ,
247
250
`pacote:tarball:${ this . from } ` ,
248
251
this . opts
249
252
)
253
+ cstream . on ( 'integrity' , i => this . integrity = i )
254
+ cstream . on ( 'error' , err => stream . emit ( 'error' , err ) )
250
255
stream . pipe ( cstream )
251
- // defer istream end until after cstream
252
- // cache write errors should not crash the fetch, this is best-effort.
253
- cstream . promise ( ) . catch ( ( ) => { } ) . then ( ( ) => istream . end ( ) )
254
256
255
- return istream
257
+ cstream . promise ( ) . catch ( ( ) => { } ) . then ( ( ) => middleStream . end ( ) )
258
+ return middleStream
256
259
}
257
260
258
261
pickIntegrityAlgorithm ( ) {
0 commit comments