Skip to content

Commit 40852ff

Browse files
mpodlasinbenlesh
authored andcommittedJul 26, 2018
fix(mergeAll): add source subscription to composite before actually subscribing (#2479)
Add subscriptions for source Observables to mergeAll composite subscription before actually subscribing to any of these Observables, so that if source Observable emits synchronously and consumer of mergeAll unsubscribes at that moment (for example `take` operator), subscription to source is unsubscribed as well and Observable stops emitting. Closes #2476
1 parent 0979d31 commit 40852ff

File tree

3 files changed

+52
-13
lines changed

3 files changed

+52
-13
lines changed
 

‎spec/operators/mergeAll-spec.ts

+31-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { expect } from 'chai';
2-
import { mergeAll, mergeMap } from 'rxjs/operators';
2+
import { mergeAll, mergeMap, take } from 'rxjs/operators';
33
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
44
import { throwError, from, of, Observable } from 'rxjs';
55

@@ -413,6 +413,36 @@ describe('mergeAll oeprator', () => {
413413
() => { done(new Error('should not be called')); });
414414
});
415415

416+
it('should finalize generators when merged if the subscription ends', () => {
417+
const iterable = {
418+
finalized: false,
419+
next() {
420+
return {value: 'duck', done: false};
421+
},
422+
return() {
423+
this.finalized = true;
424+
},
425+
[Symbol.iterator]() {
426+
return this;
427+
}
428+
};
429+
430+
const results: string[] = [];
431+
432+
const iterableObservable = from<string>(iterable as any);
433+
of(iterableObservable).pipe(
434+
mergeAll(),
435+
take(3)
436+
).subscribe(
437+
x => results.push(x),
438+
null,
439+
() => results.push('GOOSE!')
440+
);
441+
442+
expect(results).to.deep.equal(['duck', 'duck', 'duck', 'GOOSE!']);
443+
expect(iterable.finalized).to.be.true;
444+
});
445+
416446
type(() => {
417447
/* tslint:disable:no-unused-variable */
418448
const source1 = of(1, 2, 3);

‎src/internal/operators/mergeMap.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> {
139139
}
140140

141141
private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
142-
this.add(subscribeToResult<T, R>(this, ish, value, index));
142+
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
143+
this.add(innerSubscriber);
144+
subscribeToResult<T, R>(this, ish, value, index, innerSubscriber);
143145
}
144146

145147
protected _complete(): void {
+18-11
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,26 @@
1-
21
import { ObservableInput } from '../types';
32
import { Subscription } from '../Subscription';
43
import { InnerSubscriber } from '../InnerSubscriber';
54
import { OuterSubscriber } from '../OuterSubscriber';
5+
import { Subscriber } from '../Subscriber';
66
import { subscribeTo } from './subscribeTo';
77

8-
export function subscribeToResult<T, R>(outerSubscriber: OuterSubscriber<T, R>,
9-
result: any,
10-
outerValue?: T,
11-
outerIndex?: number): Subscription;
12-
export function subscribeToResult<T>(outerSubscriber: OuterSubscriber<any, any>,
13-
result: ObservableInput<T>,
14-
outerValue?: T,
15-
outerIndex?: number): Subscription | void {
16-
const destination = new InnerSubscriber(outerSubscriber, outerValue, outerIndex);
17-
8+
export function subscribeToResult<T, R>(
9+
outerSubscriber: OuterSubscriber<T, R>,
10+
result: any,
11+
outerValue?: T,
12+
outerIndex?: number,
13+
destination?: Subscriber<any>
14+
): Subscription;
15+
export function subscribeToResult<T, R>(
16+
outerSubscriber: OuterSubscriber<T, R>,
17+
result: any,
18+
outerValue?: T,
19+
outerIndex?: number,
20+
destination: Subscriber<any> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex)
21+
): Subscription | void {
22+
if (destination.closed) {
23+
return;
24+
}
1825
return subscribeTo(result)(destination);
1926
}

0 commit comments

Comments
 (0)
Please sign in to comment.