1
1
using System ;
2
2
using System . Linq ;
3
+ using System . Reactive ;
3
4
using System . Reactive . Linq ;
5
+ using System . Reactive . Subjects ;
4
6
using System . Threading . Tasks ;
5
7
6
8
using Bogus ;
@@ -30,12 +32,14 @@ public class GroupOnObservableFixture : IDisposable
30
32
private readonly SourceCache < Person , string > _cache = new ( p => p . UniqueKey ) ;
31
33
private readonly ChangeSetAggregator < Person , string > _results ;
32
34
private readonly GroupChangeSetAggregator < Person , string , Color > _groupResults ;
35
+ private readonly Subject < Unit > _grouperShutdown ;
33
36
private readonly Faker < Person > _faker ;
34
37
private readonly Randomizer _randomizer = new ( 0x3141_5926 ) ;
35
38
36
39
public GroupOnObservableFixture ( )
37
40
{
38
41
_faker = Fakers . Person . Clone ( ) . WithSeed ( _randomizer ) ;
42
+ _grouperShutdown = new ( ) ;
39
43
_results = _cache . Connect ( ) . AsAggregator ( ) ;
40
44
_groupResults = _cache . Connect ( ) . GroupOnObservable ( CreateFavoriteColorObservable ) . AsAggregator ( ) ;
41
45
}
@@ -179,7 +183,7 @@ public void GroupingSequenceCompletesWhenEmpty()
179
183
}
180
184
181
185
[ Fact ]
182
- public void AllSequencesCompleteWhenSourceIsDisposed ( )
186
+ public void AllSequencesShouldCompleteWhenSourceAndGroupingObservablesComplete ( )
183
187
{
184
188
// Arrange
185
189
_cache . AddOrUpdate ( _faker . Generate ( InitialCount ) ) ;
@@ -190,6 +194,7 @@ public void AllSequencesCompleteWhenSourceIsDisposed()
190
194
191
195
// Act
192
196
_cache . Dispose ( ) ;
197
+ _grouperShutdown . OnNext ( Unit . Default ) ;
193
198
194
199
// Assert
195
200
results . IsCompleted . Should ( ) . BeTrue ( ) ;
@@ -243,9 +248,11 @@ public async Task ResultsContainsCorrectRegroupedValuesAsync()
243
248
}
244
249
245
250
[ Theory ]
246
- [ InlineData ( false ) ]
247
- [ InlineData ( true ) ]
248
- public void ResultCompletesOnlyWhenSourceCompletes ( bool completeSource )
251
+ [ InlineData ( false , false ) ]
252
+ [ InlineData ( true , false ) ]
253
+ [ InlineData ( false , true ) ]
254
+ [ InlineData ( true , true ) ]
255
+ public void ResultCompletesOnlyWhenSourceAndAllGroupingObservablesComplete ( bool completeSource , bool completeGroups )
249
256
{
250
257
// Arrange
251
258
_cache . AddOrUpdate ( _faker . Generate ( InitialCount ) ) ;
@@ -255,10 +262,14 @@ public void ResultCompletesOnlyWhenSourceCompletes(bool completeSource)
255
262
{
256
263
_cache . Dispose ( ) ;
257
264
}
265
+ if ( completeGroups )
266
+ {
267
+ _grouperShutdown . OnNext ( Unit . Default ) ;
268
+ }
258
269
259
270
// Assert
260
271
_results . IsCompleted . Should ( ) . Be ( completeSource ) ;
261
- _groupResults . IsCompleted . Should ( ) . Be ( completeSource ) ;
272
+ _groupResults . IsCompleted . Should ( ) . Be ( completeGroups && completeSource ) ;
262
273
}
263
274
264
275
[ Fact ]
@@ -311,6 +322,7 @@ public void Dispose()
311
322
_groupResults . Dispose ( ) ;
312
323
_results . Dispose ( ) ;
313
324
_cache . Dispose ( ) ;
325
+ _grouperShutdown . Dispose ( ) ;
314
326
}
315
327
316
328
private void RandomFavoriteColorChange ( )
@@ -342,6 +354,6 @@ private static void VerifyGroupingResults(ISourceCache<Person, string> cache, Ch
342
354
groupResults . Groups . Items . ForEach ( group => group . Data . Count . Should ( ) . BeGreaterThan ( 0 , "Empty groups should be removed" ) ) ;
343
355
}
344
356
345
- private static IObservable < Color > CreateFavoriteColorObservable ( Person person , string key ) =>
346
- person . WhenPropertyChanged ( p => p . FavoriteColor ) . Select ( change => change . Value ) ;
357
+ private IObservable < Color > CreateFavoriteColorObservable ( Person person , string key ) =>
358
+ person . WhenPropertyChanged ( p => p . FavoriteColor ) . Select ( change => change . Value ) . TakeUntil ( _grouperShutdown ) ;
347
359
}
0 commit comments