Skip to content

Commit 37ffe9d

Browse files
authoredDec 27, 2024··
Added new Filter operators that utilize a predicate state stream, to help avoid unneccessary allocations of a new filter predicate delegate, every time the consumer desires to change filtering logic. (#941)
1 parent dd1f54d commit 37ffe9d

14 files changed

+3987
-19
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
using System;
2+
using System.Collections.Immutable;
3+
using System.Linq;
4+
using System.Reactive.Subjects;
5+
6+
using BenchmarkDotNet.Attributes;
7+
8+
using Bogus;
9+
10+
namespace DynamicData.Benchmarks.Cache;
11+
12+
[MemoryDiagnoser]
13+
[MarkdownExporterAttribute.GitHub]
14+
public class Filter_Cache_WithPredicateState
15+
{
16+
public Filter_Cache_WithPredicateState()
17+
{
18+
// Not exercising Moved, since ChangeAwareCache<> doesn't support it, and I'm too lazy to implement it by hand.
19+
var changeReasons = new[]
20+
{
21+
ChangeReason.Add,
22+
ChangeReason.Refresh,
23+
ChangeReason.Remove,
24+
ChangeReason.Update
25+
};
26+
27+
// Weights are chosen to make the cache size likely to grow over time,
28+
// exerting more pressure on the system the longer the benchmark runs.
29+
// Also, to prevent bogus operations (E.G. you can't remove an item from an empty cache).
30+
var changeReasonWeightsWhenCountIs0 = new[]
31+
{
32+
1f, // Add
33+
0f, // Refresh
34+
0f, // Remove
35+
0f // Update
36+
};
37+
38+
var changeReasonWeightsOtherwise = new[]
39+
{
40+
0.30f, // Add
41+
0.25f, // Refresh
42+
0.20f, // Remove
43+
0.25f // Update
44+
};
45+
46+
var maxChangeCount = 20;
47+
48+
var randomizer = new Randomizer(0x1234567);
49+
50+
var changeSets = ImmutableArray.CreateBuilder<IChangeSet<Item, int>>(initialCapacity: 5_000);
51+
var nextItemId = 1;
52+
var items = new ChangeAwareCache<Item, int>();
53+
while (changeSets.Count < changeSets.Capacity)
54+
{
55+
var changeCount = randomizer.Int(1, maxChangeCount);
56+
for (var i = 0; i < changeCount; ++i)
57+
{
58+
var changeReason = randomizer.WeightedRandom(changeReasons, items.Count switch
59+
{
60+
0 => changeReasonWeightsWhenCountIs0,
61+
_ => changeReasonWeightsOtherwise
62+
});
63+
64+
switch (changeReason)
65+
{
66+
case ChangeReason.Add:
67+
items.AddOrUpdate(
68+
item: new Item()
69+
{
70+
Id = nextItemId,
71+
IsIncluded = randomizer.Bool()
72+
},
73+
key: nextItemId);
74+
++nextItemId;
75+
break;
76+
77+
case ChangeReason.Refresh:
78+
items.Refresh(items.Keys.ElementAt(randomizer.Int(0, items.Count - 1)));
79+
break;
80+
81+
case ChangeReason.Remove:
82+
items.Remove(items.Keys.ElementAt(randomizer.Int(0, items.Count - 1)));
83+
break;
84+
85+
case ChangeReason.Update:
86+
var id = items.Keys.ElementAt(randomizer.Int(0, items.Count - 1));
87+
items.AddOrUpdate(
88+
item: new Item()
89+
{
90+
Id = id,
91+
IsIncluded = randomizer.Bool()
92+
},
93+
key: id);
94+
break;
95+
}
96+
}
97+
98+
changeSets.Add(items.CaptureChanges());
99+
}
100+
_changeSets = changeSets.MoveToImmutable();
101+
102+
103+
var predicateStates = ImmutableArray.CreateBuilder<int>(initialCapacity: 5_000);
104+
while (predicateStates.Count < predicateStates.Capacity)
105+
predicateStates.Add(randomizer.Int());
106+
_predicateStates = predicateStates.MoveToImmutable();
107+
}
108+
109+
[Benchmark(Baseline = true)]
110+
public void RandomizedEditsAndStateChanges()
111+
{
112+
using var source = new Subject<IChangeSet<Item, int>>();
113+
using var predicateState = new Subject<int>();
114+
115+
using var subscription = source
116+
.Filter(
117+
predicateState: predicateState,
118+
predicate: Item.FilterByIdInclusionMask)
119+
.Subscribe();
120+
121+
PublishNotifications(source, predicateState);
122+
123+
subscription.Dispose();
124+
}
125+
126+
private void PublishNotifications(
127+
IObserver<IChangeSet<Item, int>> source,
128+
IObserver<int> predicateState)
129+
{
130+
int i;
131+
for (i = 0; (i < _changeSets.Length) && (i < _predicateStates.Length); ++i)
132+
{
133+
source.OnNext(_changeSets[i]);
134+
predicateState.OnNext(_predicateStates[i]);
135+
}
136+
137+
for (; i < _changeSets.Length; ++i)
138+
source.OnNext(_changeSets[i]);
139+
140+
for (; i < _predicateStates.Length; ++i)
141+
predicateState.OnNext(_predicateStates[i]);
142+
}
143+
144+
private readonly ImmutableArray<IChangeSet<Item, int>> _changeSets;
145+
private readonly ImmutableArray<int> _predicateStates;
146+
147+
public class Item
148+
{
149+
public static bool FilterByIdInclusionMask(
150+
int idInclusionMask,
151+
Item item)
152+
=> ((item.Id & idInclusionMask) == 0) && item.IsIncluded;
153+
154+
public required int Id { get; init; }
155+
156+
public bool IsIncluded { get; set; }
157+
158+
public override string ToString()
159+
=> $"{{ Id = {Id}, IsIncluded = {IsIncluded} }}";
160+
}
161+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
using System;
2+
using System.Collections.Immutable;
3+
using System.Linq;
4+
using System.Reactive.Subjects;
5+
6+
using BenchmarkDotNet.Attributes;
7+
8+
using Bogus;
9+
10+
namespace DynamicData.Benchmarks.List;
11+
12+
[MemoryDiagnoser]
13+
[MarkdownExporterAttribute.GitHub]
14+
public class Filter_List_WithPredicateState
15+
{
16+
public Filter_List_WithPredicateState()
17+
{
18+
var randomizer = new Randomizer(0x1234567);
19+
20+
_changeSets = GenerateStressItemsAndChangeSets(
21+
editCount: 5_000,
22+
maxChangeCount: 20,
23+
maxRangeSize: 10,
24+
randomizer: randomizer);
25+
26+
_predicateStates = GenerateRandomPredicateStates(
27+
valueCount: 5_000,
28+
randomizer: randomizer);
29+
}
30+
31+
[Params(ListFilterPolicy.CalculateDiff, ListFilterPolicy.ClearAndReplace)]
32+
public ListFilterPolicy FilterPolicy { get; set; }
33+
34+
[Benchmark(Baseline = true)]
35+
public void RandomizedEditsAndStateChanges()
36+
{
37+
using var source = new Subject<IChangeSet<Item>>();
38+
using var predicateState = new Subject<int>();
39+
40+
using var subscription = source
41+
.Filter(
42+
predicateState: predicateState,
43+
predicate: Item.FilterByIdInclusionMask,
44+
filterPolicy: FilterPolicy)
45+
.Subscribe();
46+
47+
PublishNotifications(source, predicateState);
48+
49+
subscription.Dispose();
50+
}
51+
52+
private static ImmutableArray<IChangeSet<Item>> GenerateStressItemsAndChangeSets(
53+
int editCount,
54+
int maxChangeCount,
55+
int maxRangeSize,
56+
Randomizer randomizer)
57+
{
58+
var changeReasons = new[]
59+
{
60+
ListChangeReason.Add,
61+
ListChangeReason.AddRange,
62+
ListChangeReason.Clear,
63+
ListChangeReason.Moved,
64+
ListChangeReason.Refresh,
65+
ListChangeReason.Remove,
66+
ListChangeReason.RemoveRange,
67+
ListChangeReason.Replace
68+
};
69+
70+
// Weights are chosen to make the cache size likely to grow over time,
71+
// exerting more pressure on the system the longer the benchmark runs.
72+
// Also, to prevent bogus operations (E.G. you can't remove an item from an empty cache).
73+
var changeReasonWeightsWhenCountIs0 = new[]
74+
{
75+
0.5f, // Add
76+
0.5f, // AddRange
77+
0.0f, // Clear
78+
0.0f, // Moved
79+
0.0f, // Refresh
80+
0.0f, // Remove
81+
0.0f, // RemoveRange
82+
0.0f // Replace
83+
};
84+
85+
var changeReasonWeightsWhenCountIs1 = new[]
86+
{
87+
0.400f, // Add
88+
0.400f, // AddRange
89+
0.001f, // Clear
90+
0.000f, // Moved
91+
0.000f, // Refresh
92+
0.199f, // Remove
93+
0.000f, // RemoveRange
94+
0.000f // Replace
95+
};
96+
97+
var changeReasonWeightsOtherwise = new[]
98+
{
99+
0.250f, // Add
100+
0.250f, // AddRange
101+
0.001f, // Clear
102+
0.100f, // Moved
103+
0.099f, // Refresh
104+
0.100f, // Remove
105+
0.100f, // RemoveRange
106+
0.100f // Replace
107+
};
108+
109+
var nextItemId = 1;
110+
111+
var changeSets = ImmutableArray.CreateBuilder<IChangeSet<Item>>(initialCapacity: editCount);
112+
113+
var items = new ChangeAwareList<Item>();
114+
115+
while (changeSets.Count < changeSets.Capacity)
116+
{
117+
var changeCount = randomizer.Int(1, maxChangeCount);
118+
for (var i = 0; i < changeCount; ++i)
119+
{
120+
var changeReason = randomizer.WeightedRandom(changeReasons, items.Count switch
121+
{
122+
0 => changeReasonWeightsWhenCountIs0,
123+
1 => changeReasonWeightsWhenCountIs1,
124+
_ => changeReasonWeightsOtherwise
125+
});
126+
127+
switch (changeReason)
128+
{
129+
case ListChangeReason.Add:
130+
items.Add(new Item()
131+
{
132+
Id = nextItemId++,
133+
IsIncluded = randomizer.Bool()
134+
});
135+
break;
136+
137+
case ListChangeReason.AddRange:
138+
items.AddRange(Enumerable.Repeat(0, randomizer.Int(1, maxRangeSize))
139+
.Select(_ => new Item()
140+
{
141+
Id = nextItemId++,
142+
IsIncluded = randomizer.Bool()
143+
}));
144+
break;
145+
146+
case ListChangeReason.Clear:
147+
items.Clear();
148+
break;
149+
150+
case ListChangeReason.Moved:
151+
items.Move(
152+
original: randomizer.Int(0, items.Count - 1),
153+
destination: randomizer.Int(0, items.Count - 1));
154+
break;
155+
156+
case ListChangeReason.Refresh:
157+
items.RefreshAt(randomizer.Int(0, items.Count - 1));
158+
break;
159+
160+
case ListChangeReason.Remove:
161+
items.RemoveAt(randomizer.Int(0, items.Count - 1));
162+
break;
163+
164+
case ListChangeReason.RemoveRange:
165+
{
166+
var rangeStartIndex = randomizer.Int(0, items.Count - 1);
167+
168+
items.RemoveRange(
169+
index: rangeStartIndex,
170+
count: Math.Min(items.Count - rangeStartIndex, randomizer.Int(1, maxRangeSize)));
171+
}
172+
break;
173+
174+
case ListChangeReason.Replace:
175+
items[randomizer.Int(0, items.Count - 1)] = new Item()
176+
{
177+
Id = nextItemId++,
178+
IsIncluded = randomizer.Bool()
179+
};
180+
break;
181+
}
182+
}
183+
184+
changeSets.Add(items.CaptureChanges());
185+
}
186+
187+
return changeSets.MoveToImmutable();
188+
}
189+
190+
private static ImmutableArray<int> GenerateRandomPredicateStates(
191+
int valueCount,
192+
Randomizer randomizer)
193+
{
194+
var values = ImmutableArray.CreateBuilder<int>(initialCapacity: valueCount);
195+
196+
while (values.Count < valueCount)
197+
values.Add(randomizer.Int());
198+
199+
return values.MoveToImmutable();
200+
}
201+
202+
private void PublishNotifications(
203+
IObserver<IChangeSet<Item>> source,
204+
IObserver<int> predicateState)
205+
{
206+
int i;
207+
for (i = 0; (i < _changeSets.Length) && (i < _predicateStates.Length); ++i)
208+
{
209+
source.OnNext(_changeSets[i]);
210+
predicateState.OnNext(_predicateStates[i]);
211+
}
212+
213+
for (; i < _changeSets.Length; ++i)
214+
source.OnNext(_changeSets[i]);
215+
216+
for (; i < _predicateStates.Length; ++i)
217+
predicateState.OnNext(_predicateStates[i]);
218+
}
219+
220+
private readonly ImmutableArray<IChangeSet<Item>> _changeSets;
221+
private readonly ImmutableArray<int> _predicateStates;
222+
223+
public class Item
224+
{
225+
public static bool FilterByIdInclusionMask(
226+
int idInclusionMask,
227+
Item item)
228+
=> ((item.Id & idInclusionMask) == 0) && item.IsIncluded;
229+
230+
public required int Id { get; init; }
231+
232+
public bool IsIncluded { get; set; }
233+
234+
public override string ToString()
235+
=> $"{{ Id = {Id}, IsIncluded = {IsIncluded} }}";
236+
}
237+
}

‎src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt

+5
Original file line numberDiff line numberDiff line change
@@ -1327,6 +1327,9 @@ namespace DynamicData
13271327
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> Filter<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.IObservable<System.Func<TObject, bool>> predicateChanged, System.IObservable<System.Reactive.Unit> reapplyFilter, bool suppressEmptyChangeSets = true)
13281328
where TObject : notnull
13291329
where TKey : notnull { }
1330+
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> Filter<TObject, TKey, TState>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.IObservable<TState> predicateState, System.Func<TState, TObject, bool> predicate, bool suppressEmptyChangeSets = true)
1331+
where TObject : notnull
1332+
where TKey : notnull { }
13301333
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> FilterImmutable<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Func<TObject, bool> predicate, bool suppressEmptyChangeSets = true)
13311334
where TObject : notnull
13321335
where TKey : notnull { }
@@ -2325,6 +2328,8 @@ namespace DynamicData
23252328
where T : notnull { }
23262329
public static System.IObservable<DynamicData.IChangeSet<T>> Filter<T>(this System.IObservable<DynamicData.IChangeSet<T>> source, System.IObservable<System.Func<T, bool>> predicate, DynamicData.ListFilterPolicy filterPolicy = 1)
23272330
where T : notnull { }
2331+
public static System.IObservable<DynamicData.IChangeSet<T>> Filter<T, TState>(this System.IObservable<DynamicData.IChangeSet<T>> source, System.IObservable<TState> predicateState, System.Func<TState, T, bool> predicate, DynamicData.ListFilterPolicy filterPolicy = 1, bool suppressEmptyChangeSets = true)
2332+
where T : notnull { }
23282333
public static System.IObservable<DynamicData.IChangeSet<TObject>> FilterOnObservable<TObject>(this System.IObservable<DynamicData.IChangeSet<TObject>> source, System.Func<TObject, System.IObservable<bool>> objectFilterObservable, System.TimeSpan? propertyChangedThrottle = default, System.Reactive.Concurrency.IScheduler? scheduler = null)
23292334
where TObject : notnull { }
23302335
[System.Obsolete("Use AutoRefresh(), followed by Filter() instead")]

‎src/DynamicData.Tests/Cache/FilterFixture.WithPredicateState.cs

+805
Large diffs are not rendered by default.

‎src/DynamicData.Tests/Cache/FilterFixture.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
namespace DynamicData.Tests.Cache;
1212

13-
public class FilterFixture : IDisposable
13+
public partial class FilterFixture : IDisposable
1414
{
1515
private readonly ChangeSetAggregator<Person, string> _results;
1616

‎src/DynamicData.Tests/List/FilterFixture.WithPredicateState.cs

+1,031
Large diffs are not rendered by default.

‎src/DynamicData.Tests/List/FilterFixture.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
namespace DynamicData.Tests.List;
1111

12-
public class FilterFixture : IDisposable
12+
public partial class FilterFixture : IDisposable
1313
{
1414
private readonly ChangeSetAggregator<Person> _results;
1515

‎src/DynamicData.Tests/Utilities/ObservableExtensions.cs

+26-15
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,17 @@ public static IDisposable RecordCacheItems<TObject, TKey>(
6464
return source.Subscribe(observer);
6565
}
6666

67+
public static IDisposable RecordListItems<T>(
68+
this IObservable<IChangeSet<T>> source,
69+
out ListItemRecordingObserver<T> observer,
70+
IScheduler? scheduler = null)
71+
where T : notnull
72+
{
73+
observer = new ListItemRecordingObserver<T>(scheduler ?? GlobalConfig.DefaultScheduler);
74+
75+
return source.Subscribe(observer);
76+
}
77+
6778
public static IDisposable RecordValues<T>(
6879
this IObservable<T> source,
6980
out ValueRecordingObserver<T> observer,
@@ -84,6 +95,8 @@ public static IObservable<IChangeSet<T>> ValidateChangeSets<T>(this IObservable<
8495

8596
var reasons = Enum.GetValues<ListChangeReason>();
8697

98+
var receivedChangeSets = new List<IChangeSet<T>>();
99+
87100
return source.SubscribeSafe(RawAnonymousObserver.Create<IChangeSet<T>>(
88101
onNext: changes =>
89102
{
@@ -99,16 +112,11 @@ public static IObservable<IChangeSet<T>> ValidateChangeSets<T>(this IObservable<
99112
{
100113
case ChangeType.Item:
101114
change.Item.Reason.Should().Be(change.Reason);
102-
103115
change.Range.Should().BeEmpty("single-item changes should not specify range info");
104116
break;
105117

106118
case ChangeType.Range:
107-
change.Item.Reason.Should().Be(default, "range changes should not specify single-item info");
108-
change.Item.PreviousIndex.Should().Be(-1, "range changes should not specify single-item info");
109-
change.Item.Previous.HasValue.Should().BeFalse("range changes should not specify single-item info");
110-
change.Item.CurrentIndex.Should().Be(-1, "range changes should not specify single-item info");
111-
change.Item.Current.Should().Be(default, "range changes should not specify single-item info");
119+
change.Item.Should().Be(default(ItemChange<T>), "range changes should not specify single-item info");
112120
break;
113121
}
114122

@@ -129,7 +137,7 @@ public static IObservable<IChangeSet<T>> ValidateChangeSets<T>(this IObservable<
129137
break;
130138

131139
case ListChangeReason.AddRange:
132-
change.Range.Index.Should().BeInRange(-1, sortedItems.Count - 1, "the insertion index should be omitted, a valid index of the collection, or the next available index of the collection");
140+
change.Range.Index.Should().BeInRange(-1, sortedItems.Count, "the insertion index should be omitted, a valid index of the collection, or the next available index of the collection");
133141
if (change.Range.Index is -1)
134142
sortedItems.AddRange(change.Range);
135143
else
@@ -140,11 +148,9 @@ public static IObservable<IChangeSet<T>> ValidateChangeSets<T>(this IObservable<
140148
break;
141149

142150
case ListChangeReason.Clear:
143-
change.Range.Index.Should().Be(-1, "a Clear change has no target index");
144-
change.Range.Should().BeEquivalentTo(
145-
sortedItems,
146-
config => config.WithStrictOrdering(),
147-
"items in the range should match the corresponding items in the collection");
151+
change.Range.Index.Should().Be(-1, "a Clear change applies to an entire collection, it does not have a specific index");
152+
// The fact that ChangeAwareList can generate Clear changesets with items listed not in the order that they appear in the source seems like a defect to me. Maybe fix?
153+
change.Range.Should().BeEquivalentTo(sortedItems, "items in the range should match the corresponding items in the collection");
148154

149155
sortedItems.Clear();
150156

@@ -169,7 +175,8 @@ public static IObservable<IChangeSet<T>> ValidateChangeSets<T>(this IObservable<
169175
sortedItems.Should().NotBeEmpty("an item cannot be refreshed within an empty collection");
170176

171177
change.Item.PreviousIndex.Should().Be(-1, "only Moved changes should specify a previous index");
172-
change.Item.Previous.HasValue.Should().BeFalse("only Update changes should specify a previous item");
178+
// This should likely be fixed. The purpose of Refresh changes is to force re-evaluation of an item that specifically has not changed, the previous item will always be the current item, by definition.
179+
//change.Item.Previous.HasValue.Should().BeFalse("only Update changes should specify a previous item");
173180
change.Item.CurrentIndex.Should().BeInRange(0, sortedItems.Count - 1, "the target index should be a valid index of the collection");
174181
change.Item.Current.Should().Be(sortedItems[change.Item.CurrentIndex], "the item to be refreshed should match the corresponding item in the collection");
175182

@@ -211,10 +218,10 @@ public static IObservable<IChangeSet<T>> ValidateChangeSets<T>(this IObservable<
211218
case ListChangeReason.Replace:
212219
sortedItems.Should().NotBeEmpty("an item cannot be replaced within an empty collection");
213220

214-
change.Item.PreviousIndex.Should().Be(-1, "only Moved changes should specify a previous index");
221+
change.Item.PreviousIndex.Should().BeInRange(0, sortedItems.Count - 1, "the index of replacement should be a valid index of the collection");
215222
change.Item.CurrentIndex.Should().BeInRange(0, sortedItems.Count - 1, "the index to be replaced should be a valid index of the collection");
216223
change.Item.Previous.HasValue.Should().BeTrue("a Replace change should specify a previous item");
217-
change.Item.Previous.Should().Be(sortedItems[change.Item.CurrentIndex], "the replaced item should match the corresponding item in the collection");
224+
change.Item.Previous.Value.Should().Be(sortedItems[change.Item.CurrentIndex], "the replaced item should match the corresponding item in the collection");
218225

219226
sortedItems[change.Item.CurrentIndex] = change.Item.Current;
220227

@@ -226,6 +233,10 @@ public static IObservable<IChangeSet<T>> ValidateChangeSets<T>(this IObservable<
226233
{
227234
observer.OnError(ex);
228235
}
236+
237+
observer.OnNext(changes);
238+
239+
receivedChangeSets.Add(changes);
229240
},
230241
onError: observer.OnError,
231242
onCompleted: observer.OnCompleted));

‎src/DynamicData.Tests/Utilities/TestSourceList.cs

+41-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Reactive.Linq;
45
using System.Reactive.Subjects;
56

@@ -12,12 +13,16 @@ public sealed class TestSourceList<T>
1213
private readonly IObservable<int> _countChanged;
1314
private readonly BehaviorSubject<Exception?> _error;
1415
private readonly BehaviorSubject<bool> _hasCompleted;
16+
private readonly Subject<IChangeSet<T>> _refreshRequested;
17+
private readonly Subject<IChangeSet<T>> _refreshRequestedPreview;
1518
private readonly SourceList<T> _source;
1619

1720
public TestSourceList()
1821
{
1922
_error = new(null);
2023
_hasCompleted = new(false);
24+
_refreshRequested = new();
25+
_refreshRequestedPreview = new();
2126
_source = new();
2227

2328
_countChanged = WrapStream(_source.CountChanged);
@@ -33,7 +38,9 @@ public IReadOnlyList<T> Items
3338
=> _source.Items;
3439

3540
public IObservable<IChangeSet<T>> Connect(Func<T, bool>? predicate = null)
36-
=> WrapStream(_source.Connect(predicate));
41+
=> WrapStream(Observable.Merge(
42+
_source.Connect(predicate),
43+
_refreshRequested));
3744

3845
public void Complete()
3946
{
@@ -47,6 +54,8 @@ public void Dispose()
4754
_source.Dispose();
4855
_error.Dispose();
4956
_hasCompleted.Dispose();
57+
_refreshRequested.Dispose();
58+
_refreshRequestedPreview.Dispose();
5059
}
5160

5261
public void Edit(Action<IExtendedList<T>> updateAction)
@@ -57,7 +66,37 @@ public void Edit(Action<IExtendedList<T>> updateAction)
5766
}
5867

5968
public IObservable<IChangeSet<T>> Preview(Func<T, bool>? predicate = null)
60-
=> WrapStream(_source.Preview(predicate));
69+
=> WrapStream(Observable.Merge(
70+
_source.Preview(predicate),
71+
_refreshRequestedPreview));
72+
73+
// TODO: Formally add this to ISourceList
74+
public void Refresh(int index)
75+
{
76+
var changeSet = new ChangeSet<T>(capacity: 1)
77+
{
78+
new Change<T>(
79+
reason: ListChangeReason.Refresh,
80+
current: _source.Items.ElementAt(index),
81+
index: index)
82+
};
83+
84+
_refreshRequestedPreview.OnNext(changeSet);
85+
_refreshRequested.OnNext(changeSet);
86+
}
87+
88+
// TODO: Formally add this to ISourceList
89+
public void Refresh(IEnumerable<int> indexes)
90+
{
91+
var changeSet = new ChangeSet<T>(indexes
92+
.Select(index => new Change<T>(
93+
reason: ListChangeReason.Refresh,
94+
current: _source.Items.ElementAt(index),
95+
index: index)));
96+
97+
_refreshRequestedPreview.OnNext(changeSet);
98+
_refreshRequested.OnNext(changeSet);
99+
}
61100

62101
public void SetError(Exception error)
63102
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved.
2+
// Roland Pheasant licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for full license information.
4+
5+
using System;
6+
using System.Reactive;
7+
using System.Reactive.Disposables;
8+
using System.Reactive.Linq;
9+
using System.Transactions;
10+
11+
using DynamicData.Internal;
12+
13+
namespace DynamicData.Cache.Internal;
14+
15+
internal static partial class Filter
16+
{
17+
public static class WithPredicateState<TObject, TKey, TState>
18+
where TObject : notnull
19+
where TKey : notnull
20+
{
21+
public static IObservable<IChangeSet<TObject, TKey>> Create(
22+
IObservable<IChangeSet<TObject, TKey>> source,
23+
IObservable<TState> predicateState,
24+
Func<TState, TObject, bool> predicate,
25+
bool suppressEmptyChangeSets)
26+
{
27+
source.ThrowArgumentNullExceptionIfNull(nameof(source));
28+
predicateState.ThrowArgumentNullExceptionIfNull(nameof(predicateState));
29+
predicate.ThrowArgumentNullExceptionIfNull(nameof(predicate));
30+
31+
return Observable.Create<IChangeSet<TObject, TKey>>(downstreamObserver => new Subscription(
32+
downstreamObserver: downstreamObserver,
33+
predicate: predicate,
34+
predicateState: predicateState,
35+
source: source,
36+
suppressEmptyChangeSets: suppressEmptyChangeSets));
37+
}
38+
39+
private sealed class Subscription
40+
: IDisposable
41+
{
42+
private readonly List<Change<TObject, TKey>> _downstreamChangesBuffer;
43+
private readonly IObserver<IChangeSet<TObject, TKey>> _downstreamObserver;
44+
private readonly Dictionary<TKey, ItemState> _itemStatesByKey;
45+
private readonly Func<TState, TObject, bool> _predicate;
46+
private readonly IDisposable? _predicateStateSubscription;
47+
private readonly IDisposable? _sourceSubscription;
48+
private readonly bool _suppressEmptyChangeSets;
49+
50+
private bool _hasPredicateStateCompleted;
51+
private bool _hasSourceCompleted;
52+
private bool _isLatestPredicateStateValid;
53+
private TState _latestPredicateState;
54+
55+
public Subscription(
56+
IObserver<IChangeSet<TObject, TKey>> downstreamObserver,
57+
Func<TState, TObject, bool> predicate,
58+
IObservable<TState> predicateState,
59+
IObservable<IChangeSet<TObject, TKey>> source,
60+
bool suppressEmptyChangeSets)
61+
{
62+
_downstreamObserver = downstreamObserver;
63+
_predicate = predicate;
64+
_suppressEmptyChangeSets = suppressEmptyChangeSets;
65+
66+
_downstreamChangesBuffer = new();
67+
_itemStatesByKey = new();
68+
69+
_latestPredicateState = default!;
70+
71+
var onError = new Action<Exception>(OnError);
72+
73+
_predicateStateSubscription = predicateState
74+
.SubscribeSafe(
75+
onNext: OnPredicateStateNext,
76+
onError: onError,
77+
onCompleted: OnPredicateStateCompleted);
78+
79+
_sourceSubscription = source
80+
.SubscribeSafe(
81+
onNext: OnSourceNext,
82+
onError: onError,
83+
onCompleted: OnSourceCompleted);
84+
}
85+
86+
public void Dispose()
87+
{
88+
_predicateStateSubscription?.Dispose();
89+
_sourceSubscription?.Dispose();
90+
}
91+
92+
private object DownstreamSynchronizationGate
93+
=> _downstreamChangesBuffer;
94+
95+
private object UpstreamSynchronizationGate
96+
=> _itemStatesByKey;
97+
98+
private IChangeSet<TObject, TKey> AssembleDownstreamChanges()
99+
{
100+
if (_downstreamChangesBuffer.Count is 0)
101+
return ChangeSet<TObject, TKey>.Empty;
102+
103+
var downstreamChanges = new ChangeSet<TObject, TKey>(_downstreamChangesBuffer);
104+
_downstreamChangesBuffer.Clear();
105+
106+
return downstreamChanges;
107+
}
108+
109+
private void OnError(Exception error)
110+
{
111+
using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate);
112+
113+
_predicateStateSubscription?.Dispose();
114+
_sourceSubscription?.Dispose();
115+
116+
@lock.SwapTo(DownstreamSynchronizationGate);
117+
118+
_downstreamObserver.OnError(error);
119+
}
120+
121+
private void OnPredicateStateCompleted()
122+
{
123+
using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate);
124+
125+
_hasPredicateStateCompleted = true;
126+
127+
// If we didn't get at least one predicateState value, we can't ever emit any (non-empty) downstream changesets,
128+
// no matter how many items come through from source, so just go ahead and complete now.
129+
if (_hasSourceCompleted || (!_isLatestPredicateStateValid && _suppressEmptyChangeSets))
130+
{
131+
@lock.SwapTo(DownstreamSynchronizationGate);
132+
133+
_downstreamObserver.OnCompleted();
134+
}
135+
}
136+
137+
private void OnPredicateStateNext(TState predicateState)
138+
{
139+
using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate);
140+
141+
_latestPredicateState = predicateState;
142+
_isLatestPredicateStateValid = true;
143+
144+
foreach (var key in _itemStatesByKey.Keys)
145+
{
146+
var itemState = _itemStatesByKey[key];
147+
148+
var isIncluded = _predicate.Invoke(predicateState, itemState.Item);
149+
150+
if (isIncluded && !itemState.IsIncluded)
151+
{
152+
_downstreamChangesBuffer.Add(new(
153+
reason: ChangeReason.Add,
154+
key: key,
155+
current: itemState.Item));
156+
}
157+
else if (!isIncluded && itemState.IsIncluded)
158+
{
159+
_downstreamChangesBuffer.Add(new(
160+
reason: ChangeReason.Remove,
161+
key: key,
162+
current: itemState.Item));
163+
}
164+
165+
_itemStatesByKey[key] = new()
166+
{
167+
IsIncluded = isIncluded,
168+
Item = itemState.Item
169+
};
170+
}
171+
172+
var downstreamChanges = AssembleDownstreamChanges();
173+
if ((downstreamChanges.Count is not 0) || !_suppressEmptyChangeSets)
174+
{
175+
@lock.SwapTo(DownstreamSynchronizationGate);
176+
177+
_downstreamObserver.OnNext(downstreamChanges);
178+
}
179+
}
180+
181+
private void OnSourceCompleted()
182+
{
183+
using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate);
184+
185+
_hasSourceCompleted = true;
186+
187+
// We can never emit any (non-empty) downstream changes in the future, if the collection is empty
188+
// and the source has reported that it'll never change, so go ahead and complete now.
189+
if (_hasPredicateStateCompleted || ((_itemStatesByKey.Count is 0) && _suppressEmptyChangeSets))
190+
{
191+
@lock.SwapTo(DownstreamSynchronizationGate);
192+
193+
_downstreamObserver.OnCompleted();
194+
}
195+
}
196+
197+
private void OnSourceNext(IChangeSet<TObject, TKey> upstreamChanges)
198+
{
199+
using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate);
200+
201+
foreach (var change in upstreamChanges.ToConcreteType())
202+
{
203+
switch (change.Reason)
204+
{
205+
case ChangeReason.Add:
206+
{
207+
var isIncluded = _isLatestPredicateStateValid && _predicate.Invoke(_latestPredicateState, change.Current);
208+
209+
_itemStatesByKey.Add(
210+
key: change.Key,
211+
value: new()
212+
{
213+
IsIncluded = isIncluded,
214+
Item = change.Current
215+
});
216+
217+
if (isIncluded)
218+
{
219+
_downstreamChangesBuffer.Add(new(
220+
reason: ChangeReason.Add,
221+
key: change.Key,
222+
current: change.Current));
223+
}
224+
}
225+
break;
226+
227+
// Intentionally not supporting Moved changes, too much work to try and track indexes.
228+
229+
case ChangeReason.Refresh:
230+
{
231+
var itemState = _itemStatesByKey[change.Key];
232+
233+
var isIncluded = _isLatestPredicateStateValid && _predicate.Invoke(_latestPredicateState, itemState.Item);
234+
235+
if (isIncluded && itemState.IsIncluded)
236+
{
237+
_downstreamChangesBuffer.Add(new(
238+
reason: ChangeReason.Refresh,
239+
key: change.Key,
240+
current: itemState.Item));
241+
}
242+
else if (isIncluded && !itemState.IsIncluded)
243+
{
244+
_downstreamChangesBuffer.Add(new(
245+
reason: ChangeReason.Add,
246+
key: change.Key,
247+
current: itemState.Item));
248+
}
249+
else if (!isIncluded && itemState.IsIncluded)
250+
{
251+
_downstreamChangesBuffer.Add(new(
252+
reason: ChangeReason.Remove,
253+
key: change.Key,
254+
current: itemState.Item));
255+
}
256+
257+
_itemStatesByKey[change.Key] = new()
258+
{
259+
IsIncluded = isIncluded,
260+
Item = itemState.Item
261+
};
262+
}
263+
break;
264+
265+
case ChangeReason.Remove:
266+
{
267+
var itemState = _itemStatesByKey[change.Key];
268+
269+
_itemStatesByKey.Remove(change.Key);
270+
271+
if (itemState.IsIncluded)
272+
{
273+
_downstreamChangesBuffer.Add(new(
274+
reason: ChangeReason.Remove,
275+
key: change.Key,
276+
current: itemState.Item));
277+
}
278+
}
279+
break;
280+
281+
case ChangeReason.Update:
282+
{
283+
var itemState = _itemStatesByKey[change.Key];
284+
285+
var isIncluded = _isLatestPredicateStateValid && _predicate.Invoke(_latestPredicateState, change.Current);
286+
287+
if (isIncluded && itemState.IsIncluded)
288+
{
289+
_downstreamChangesBuffer.Add(new(
290+
reason: ChangeReason.Update,
291+
key: change.Key,
292+
current: change.Current,
293+
previous: itemState.Item));
294+
}
295+
else if (isIncluded && !itemState.IsIncluded)
296+
{
297+
_downstreamChangesBuffer.Add(new(
298+
reason: ChangeReason.Add,
299+
key: change.Key,
300+
current: change.Current));
301+
}
302+
else if (!isIncluded && itemState.IsIncluded)
303+
{
304+
_downstreamChangesBuffer.Add(new(
305+
reason: ChangeReason.Remove,
306+
key: change.Key,
307+
current: itemState.Item));
308+
}
309+
310+
_itemStatesByKey[change.Key] = new()
311+
{
312+
IsIncluded = isIncluded,
313+
Item = change.Current
314+
};
315+
}
316+
break;
317+
}
318+
}
319+
320+
var downstreamChanges = AssembleDownstreamChanges();
321+
if ((downstreamChanges.Count is not 0) || !_suppressEmptyChangeSets)
322+
{
323+
@lock.SwapTo(DownstreamSynchronizationGate);
324+
325+
_downstreamObserver.OnNext(downstreamChanges);
326+
}
327+
}
328+
}
329+
330+
private readonly struct ItemState
331+
{
332+
public required bool IsIncluded { get; init; }
333+
334+
public required TObject Item { get; init; }
335+
}
336+
}
337+
}

‎src/DynamicData/Cache/ObservableCacheEx.cs

+28
Original file line numberDiff line numberDiff line change
@@ -1476,6 +1476,34 @@
14761476
return source.Filter(predicateChanged, Observable.Empty<Unit>(), suppressEmptyChangeSets);
14771477
}
14781478

1479+
/// <summary>
1480+
/// Creates a filtered stream which can be dynamically filtered, based on state values passed through to a static filtering predicate.
1481+
/// </summary>
1482+
/// <typeparam name="TObject">The type of the object.</typeparam>
1483+
/// <typeparam name="TKey">The type of the key.</typeparam>
1484+
/// <typeparam name="TState">The type of state value required by <paramref name="predicate"/>.</typeparam>
1485+
/// <param name="source">The source.</param>
1486+
/// <param name="predicateState">A stream of state values to be passed to <paramref name="predicate"/>.</param>
1487+
/// <param name="predicate">A static predicate to be used to determine which items should be included or excluded by the filter.</param>
1488+
/// <param name="suppressEmptyChangeSets">By default empty changeset notifications are suppressed for performance reasons. Set to false to publish empty changesets. Doing so can be useful for monitoring loading status.</param>
1489+
/// <returns>An observable which emits change sets.</returns>
1490+
/// <exception cref="ArgumentNullException">Throws for <paramref name="source"/>, <paramref name="predicateState"/>, and <paramref name="predicate"/>.</exception>
1491+
/// <remarks>
1492+
/// Usually, <paramref name="predicateState"/> should emit an initial value, immediately upon subscription. This is because <paramref name="predicate"/> cannot be invoked until the first state value is received, and accordingly, the operator will treat all items as excluded until then. Each value emitted by <paramref name="predicateState"/> will trigger a full re-filtering of the entire collection.
1493+
/// </remarks>
1494+
public static IObservable<IChangeSet<TObject, TKey>> Filter<TObject, TKey, TState>(
1495+
this IObservable<IChangeSet<TObject, TKey>> source,
1496+
IObservable<TState> predicateState,
1497+
Func<TState, TObject, bool> predicate,
1498+
bool suppressEmptyChangeSets = true)
1499+
where TObject : notnull
1500+
where TKey : notnull
1501+
=> Cache.Internal.Filter.WithPredicateState<TObject, TKey, TState>.Create(
1502+
source: source,
1503+
predicateState: predicateState,
1504+
predicate: predicate,
1505+
suppressEmptyChangeSets: suppressEmptyChangeSets);
1506+
14791507
/// <summary>
14801508
/// Creates a filtered stream which can be dynamically filtered.
14811509
/// </summary>
@@ -4137,7 +4165,7 @@
41374165
source = source ?? throw new ArgumentNullException(nameof(source));
41384166
expression = expression ?? throw new ArgumentNullException(nameof(expression));
41394167

41404168
return source.Sort(
41414169
sortOrder switch
41424170
{
41434171
SortDirection.Descending => SortExpressionComparer<TObject>.Descending(expression),
@@ -6399,7 +6427,7 @@
63996427
});
64006428
}
64016429

64026430
// TODO: Apply the Adapter to more places
64036431
private static Func<TObject, TKey, TResult> AdaptSelector<TObject, TKey, TResult>(Func<TObject, TResult> other)
64046432
where TObject : notnull
64056433
where TKey : notnull

‎src/DynamicData/List/Internal/Filter.WithPredicateState.cs

+1,267
Large diffs are not rendered by default.

‎src/DynamicData/List/ObservableListEx.cs

+29
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,35 @@ public static IObservable<IChangeSet<T>> Filter<T>(this IObservable<IChangeSet<T
712712
return new Filter<T>(source, predicate, filterPolicy).Run();
713713
}
714714

715+
/// <summary>
716+
/// Creates a filtered stream which can be dynamically filtered, based on state values passed through to a static filtering predicate.
717+
/// </summary>
718+
/// <typeparam name="T">The type of the item.</typeparam>
719+
/// <typeparam name="TState">The type of state value required by <paramref name="predicate"/>.</typeparam>
720+
/// <param name="source">The source.</param>
721+
/// <param name="predicateState">A stream of state values to be passed to <paramref name="predicate"/>.</param>
722+
/// <param name="predicate">A static predicate to be used to determine which items should be included or excluded by the filter.</param>
723+
/// <param name="filterPolicy">The policy that the operator should use when performing re-filtering operations.</param>
724+
/// <param name="suppressEmptyChangeSets">By default empty changeset notifications are suppressed for performance reasons. Set to false to publish empty changesets. Doing so can be useful for monitoring loading status.</param>
725+
/// <returns>An observable which emits change sets.</returns>
726+
/// <exception cref="ArgumentNullException">Throws for <paramref name="source"/>, <paramref name="predicateState"/>, and <paramref name="predicate"/>.</exception>
727+
/// <remarks>
728+
/// Usually, <paramref name="predicateState"/> should emit an initial value, immediately upon subscription. This is because <paramref name="predicate"/> cannot be invoked until the first state value is received, and accordingly, the operator will treat all items as excluded until then. Each value emitted by <paramref name="predicateState"/> will trigger a full re-filtering of the entire collection, according to <paramref name="filterPolicy"/>.
729+
/// </remarks>
730+
public static IObservable<IChangeSet<T>> Filter<T, TState>(
731+
this IObservable<IChangeSet<T>> source,
732+
IObservable<TState> predicateState,
733+
Func<TState, T, bool> predicate,
734+
ListFilterPolicy filterPolicy = ListFilterPolicy.CalculateDiff,
735+
bool suppressEmptyChangeSets = true)
736+
where T : notnull
737+
=> List.Internal.Filter.WithPredicateState<T, TState>.Create(
738+
source: source,
739+
predicateState: predicateState,
740+
predicate: predicate,
741+
filterPolicy: filterPolicy,
742+
suppressEmptyChangeSets: suppressEmptyChangeSets);
743+
715744
/// <summary>
716745
/// <para>Filters source on the specified observable property using the specified predicate.</para>
717746
/// <para>The filter will automatically reapply when a property changes.</para>

‎src/DynamicData/Polyfills/EnumEx.cs

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved.
2+
// Roland Pheasant licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for full license information.
4+
5+
namespace System;
6+
7+
internal static class EnumEx
8+
{
9+
#if NET5_0_OR_GREATER
10+
public static bool IsDefined<TEnum>(TEnum value)
11+
where TEnum : struct, Enum
12+
=> Enum.IsDefined(value);
13+
#else
14+
public static bool IsDefined<TEnum>(TEnum value)
15+
where TEnum : struct, Enum
16+
=> Enum.IsDefined(typeof(TEnum), value);
17+
#endif
18+
}

0 commit comments

Comments
 (0)
Please sign in to comment.