@@ -11,28 +11,45 @@ internal sealed class ToObservableOptional<TObject, TKey>(IObservable<IChangeSet
11
11
where TObject : notnull
12
12
where TKey : notnull
13
13
{
14
+ private readonly IEqualityComparer < TObject > _equalityComparer = equalityComparer ?? EqualityComparer < TObject > . Default ;
14
15
private readonly IObservable < IChangeSet < TObject , TKey > > _source = source ?? throw new ArgumentNullException ( nameof ( source ) ) ;
15
16
private readonly TKey _key = key ;
16
17
17
18
public IObservable < Optional < TObject > > Run ( ) => Observable . Create < Optional < TObject > > ( observer =>
18
- _source . Subscribe (
19
- changes =>
20
- changes . Where ( ShouldEmitChange ) . ForEach ( change => observer . OnNext ( change switch
21
- {
22
- { Reason : ChangeReason . Remove } => Optional . None < TObject > ( ) ,
23
- _ => Optional . Some ( change . Current ) ,
24
- } ) ) ,
25
- observer . OnError ,
26
- observer . OnCompleted ) ) ;
27
-
28
- private bool ShouldEmitChange ( Change < TObject , TKey > change ) => change switch
29
19
{
30
- { Key : { } thekey } when ! thekey . Equals ( _key ) => false ,
31
- { Reason : ChangeReason . Add } => true ,
32
- { Reason : ChangeReason . Remove } => true ,
33
- { Reason : ChangeReason . Update , Previous . HasValue : false } => true ,
34
- { Reason : ChangeReason . Update } when equalityComparer is not null => ! equalityComparer . Equals ( change . Current , change . Previous . Value ) ,
35
- { Reason : ChangeReason . Update } => ! ReferenceEquals ( change . Current , change . Previous . Value ) ,
36
- _ => false ,
37
- } ;
20
+ var lastValue = Optional . None < TObject > ( ) ;
21
+
22
+ return _source . Subscribe (
23
+ changes => lastValue = EmitChanges ( changes , observer , lastValue ) ,
24
+ observer . OnError ,
25
+ observer . OnCompleted ) ;
26
+ } ) ;
27
+
28
+ private Optional < TObject > EmitChanges ( IChangeSet < TObject , TKey > changes , IObserver < Optional < TObject > > observer , Optional < TObject > lastValue )
29
+ {
30
+ foreach ( var change in changes . ToConcreteType ( ) )
31
+ {
32
+ // Ignore changes for different keys
33
+ if ( ! change . Key . Equals ( _key ) )
34
+ {
35
+ continue ;
36
+ }
37
+
38
+ // Remove is None, everything else is the current value
39
+ var emitValue = change switch
40
+ {
41
+ { Reason : ChangeReason . Remove } => Optional . None < TObject > ( ) ,
42
+ _ => Optional . Some ( change . Current ) ,
43
+ } ;
44
+
45
+ // Emit the value if it has changed
46
+ if ( ( emitValue . HasValue != lastValue . HasValue ) || ( emitValue . HasValue && ! _equalityComparer . Equals ( lastValue . Value , emitValue . Value ) ) )
47
+ {
48
+ observer . OnNext ( emitValue ) ;
49
+ lastValue = emitValue ;
50
+ }
51
+ }
52
+
53
+ return lastValue ;
54
+ }
38
55
}
0 commit comments