Skip to content

Commit dd1f54d

Browse files
authoredDec 24, 2024··
Fixed initialization logic for Join operators, to ensure that only one initial changeset is emitted, and that it emits only after both sources have emitted their initialization changeset. (#945)
1 parent 95b94d3 commit dd1f54d

File tree

6 files changed

+183
-77
lines changed

6 files changed

+183
-77
lines changed
 

‎src/DynamicData.Tests/Cache/InnerJoinFixture.cs

+26
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
using System;
22

3+
using DynamicData.Tests.Utilities;
4+
35
using FluentAssertions;
46

57
using Xunit;
@@ -206,6 +208,30 @@ public void MoreRight()
206208

207209
}
208210

211+
[Fact]
212+
public void InitializationWaitsForBothSources()
213+
{
214+
var left = new[] { 1, 2, 3 };
215+
var right = new[] { 4, 6, 2 };
216+
217+
ObservableCacheEx
218+
.InnerJoin(
219+
left: left.AsObservableChangeSet(static left => 2 * left),
220+
right: right.AsObservableChangeSet(static right => right),
221+
rightKeySelector: static right => right,
222+
resultSelector: static (left, right) => (left, right))
223+
.ValidateSynchronization()
224+
.ValidateChangeSets(static pair => (2 * pair.left, pair.right))
225+
.RecordCacheItems(out var results);
226+
227+
results.Error.Should().BeNull();
228+
229+
results.RecordedChangeSets.Count.Should().Be(1, "Initialization should only emit one changeset.");
230+
results.RecordedChangeSets[0].Should().OnlyContain(change => change.Reason == ChangeReason.Add, "Initialization should only emit Add changes.");
231+
232+
results.RecordedItemsByKey.Values.Should().OnlyContain(pair => (2 * pair.left) == pair.right, "Source items should have been joined correctly");
233+
}
234+
209235
public class Device(string name) : IEquatable<Device>
210236
{
211237
public string Name { get; } = name;

‎src/DynamicData.Tests/Cache/LeftJoinFixture.cs

+27
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Linq;
33

44
using DynamicData.Kernel;
5+
using DynamicData.Tests.Utilities;
56

67
using FluentAssertions;
78

@@ -165,6 +166,32 @@ public void UpdateRight()
165166
_result.Data.Items.All(dwm => dwm.MetaData != Optional<DeviceMetaData>.None).Should().BeTrue();
166167
}
167168

169+
[Fact]
170+
public void InitializationWaitsForBothSources()
171+
{
172+
// https://github.com/reactivemarbles/DynamicData/issues/943
173+
174+
var left = new[] { 1, 2, 3 };
175+
var right = new[] { 4, 6, 2 };
176+
177+
ObservableCacheEx
178+
.LeftJoin(
179+
left: left.AsObservableChangeSet(static left => 2 * left),
180+
right: right.AsObservableChangeSet(static right => right),
181+
rightKeySelector: static right => right,
182+
resultSelector: static (left, right) => (left, right: right.HasValue ? right.Value : default(int?)))
183+
.ValidateSynchronization()
184+
.ValidateChangeSets(static pair => 2 * pair.left)
185+
.RecordCacheItems(out var results);
186+
187+
results.Error.Should().BeNull();
188+
189+
results.RecordedChangeSets.Count.Should().Be(1, "Initialization should only emit one changeset.");
190+
results.RecordedChangeSets[0].Should().OnlyContain(change => change.Reason == ChangeReason.Add, "Initialization should only emit Add changes.");
191+
192+
results.RecordedItemsByKey.Values.Should().OnlyContain(pair => (2 * pair.left) == pair.right, "Source items should have been joined correctly");
193+
}
194+
168195
public class Device(string name) : IEquatable<Device>
169196
{
170197
public string Name { get; } = name;

‎src/DynamicData.Tests/Cache/RightJoinFixture.cs

+23
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Linq;
33

44
using DynamicData.Kernel;
5+
using DynamicData.Tests.Utilities;
56

67
using FluentAssertions;
78

@@ -216,7 +217,29 @@ public void MoreRight()
216217
_result.Data.Items.Count(dwm => dwm.Device == Optional<Device>.None).Should().Be(1);
217218
}
218219

220+
[Fact]
221+
public void InitializationWaitsForBothSources()
222+
{
223+
var left = new[] { 1, 2, 3 };
224+
var right = new[] { 4, 6, 2 };
225+
226+
ObservableCacheEx
227+
.RightJoin(
228+
left: left.AsObservableChangeSet(static left => 2 * left),
229+
right: right.AsObservableChangeSet(static right => right),
230+
rightKeySelector: static right => right,
231+
resultSelector: static (left, right) => (left: left.HasValue ? left.Value : default(int?), right))
232+
.ValidateSynchronization()
233+
.ValidateChangeSets(static pair => pair.right)
234+
.RecordCacheItems(out var results);
219235

236+
results.Error.Should().BeNull();
237+
238+
results.RecordedChangeSets.Count.Should().Be(1, "Initialization should only emit one changeset.");
239+
results.RecordedChangeSets[0].Should().OnlyContain(change => change.Reason == ChangeReason.Add, "Initialization should only emit Add changes.");
240+
241+
results.RecordedItemsByKey.Values.Should().OnlyContain(pair => (2 * pair.left) == pair.right, "Source items should have been joined correctly");
242+
}
220243

221244
public class Device(string name) : IEquatable<Device>
222245
{

‎src/DynamicData/Cache/Internal/InnerJoin.cs

+45-36
Original file line numberDiff line numberDiff line change
@@ -37,47 +37,52 @@ internal sealed class InnerJoin<TLeft, TLeftKey, TRight, TRightKey, TDestination
3737
// joined is the final cache
3838
var joinedCache = new ChangeAwareCache<TDestination, (TLeftKey, TRightKey)>();
3939

40-
var leftLoader = leftCache.Connect().Select(changes =>
41-
{
42-
foreach (var change in changes.ToConcreteType())
43-
{
44-
var leftCurrent = change.Current;
45-
var rightLookup = rightGrouped.Lookup(change.Key);
40+
var hasInitialized = false;
4641

47-
if (rightLookup.HasValue)
42+
var leftLoader = leftCache.Connect()
43+
.Select(changes =>
44+
{
45+
foreach (var change in changes.ToConcreteType())
4846
{
49-
switch (change.Reason)
47+
var leftCurrent = change.Current;
48+
var rightLookup = rightGrouped.Lookup(change.Key);
49+
50+
if (rightLookup.HasValue)
5051
{
51-
case ChangeReason.Add:
52-
case ChangeReason.Update:
53-
foreach (var keyvalue in rightLookup.Value.KeyValues)
54-
{
55-
joinedCache.AddOrUpdate(_resultSelector((change.Key, keyvalue.Key), leftCurrent, keyvalue.Value), (change.Key, keyvalue.Key));
56-
}
57-
58-
break;
59-
60-
case ChangeReason.Remove:
61-
foreach (var keyvalue in rightLookup.Value.KeyValues)
62-
{
63-
joinedCache.Remove((change.Key, keyvalue.Key));
64-
}
65-
66-
break;
67-
68-
case ChangeReason.Refresh:
69-
foreach (var key in rightLookup.Value.Keys)
70-
{
71-
joinedCache.Refresh((change.Key, key));
72-
}
73-
74-
break;
52+
switch (change.Reason)
53+
{
54+
case ChangeReason.Add:
55+
case ChangeReason.Update:
56+
foreach (var keyvalue in rightLookup.Value.KeyValues)
57+
{
58+
joinedCache.AddOrUpdate(_resultSelector((change.Key, keyvalue.Key), leftCurrent, keyvalue.Value), (change.Key, keyvalue.Key));
59+
}
60+
61+
break;
62+
63+
case ChangeReason.Remove:
64+
foreach (var keyvalue in rightLookup.Value.KeyValues)
65+
{
66+
joinedCache.Remove((change.Key, keyvalue.Key));
67+
}
68+
69+
break;
70+
71+
case ChangeReason.Refresh:
72+
foreach (var key in rightLookup.Value.Keys)
73+
{
74+
joinedCache.Refresh((change.Key, key));
75+
}
76+
77+
break;
78+
}
7579
}
7680
}
77-
}
7881

79-
return joinedCache.CaptureChanges();
80-
});
82+
return joinedCache.CaptureChanges();
83+
})
84+
// Don't forward initial changesets from the left side, only the right
85+
.Where(_ => hasInitialized);
8186

8287
var rightLoader = rightCache.Connect().Select(changes =>
8388
{
@@ -119,7 +124,11 @@ internal sealed class InnerJoin<TLeft, TLeftKey, TRight, TRightKey, TDestination
119124

120125
lock (locker)
121126
{
122-
return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache, rightShare.Connect());
127+
var observerSubscription = leftLoader.Merge(rightLoader).SubscribeSafe(observer);
128+
129+
hasInitialized = true;
130+
131+
return new CompositeDisposable(observerSubscription, leftCache, rightCache, rightShare.Connect());
123132
}
124133
});
125134
}

‎src/DynamicData/Cache/Internal/LeftJoin.cs

+14-5
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,15 @@ public IObservable<IChangeSet<TDestination, TLeftKey>> Run() => Observable.Creat
3030
var locker = new object();
3131

3232
// create local backing stores
33-
var leftCache = _left.Synchronize(locker).AsObservableCache(false);
33+
var leftShare = _left.Synchronize(locker).Publish();
34+
var leftCache = leftShare.AsObservableCache(false);
3435
var rightCache = _right.Synchronize(locker).ChangeKey(_rightKeySelector).AsObservableCache(false);
3536

3637
// joined is the final cache
3738
var joined = new ChangeAwareCache<TDestination, TLeftKey>();
3839

40+
var hasInitialized = false;
41+
3942
var leftLoader = leftCache.Connect().Select(
4043
changes =>
4144
{
@@ -66,8 +69,8 @@ public IObservable<IChangeSet<TDestination, TLeftKey>> Run() => Observable.Creat
6669
return joined.CaptureChanges();
6770
});
6871

69-
var rightLoader = rightCache.Connect().Select(
70-
changes =>
72+
var rightLoader = rightCache.Connect()
73+
.Select(changes =>
7174
{
7275
foreach (var change in changes.ToConcreteType())
7376
{
@@ -117,11 +120,17 @@ public IObservable<IChangeSet<TDestination, TLeftKey>> Run() => Observable.Creat
117120
}
118121

119122
return joined.CaptureChanges();
120-
});
123+
})
124+
// Don't forward initial changesets from the right side, only the left
125+
.Where(_ => hasInitialized);
121126

122127
lock (locker)
123128
{
124-
return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache);
129+
var observerSubscription = leftLoader.Merge(rightLoader).SubscribeSafe(observer);
130+
131+
hasInitialized = true;
132+
133+
return new CompositeDisposable(observerSubscription, leftCache, rightCache, leftShare.Connect());
125134
}
126135
});
127136
}

‎src/DynamicData/Cache/Internal/RightJoin.cs

+48-36
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public IObservable<IChangeSet<TDestination, TRightKey>> Run() => Observable.Crea
4040
// joined is the final cache
4141
var joinedCache = new ChangeAwareCache<TDestination, TRightKey>();
4242

43+
var hasInitialized = false;
44+
4345
var rightLoader = rightCache.Connect().Select(changes =>
4446
{
4547
foreach (var change in changes.ToConcreteType())
@@ -70,48 +72,58 @@ public IObservable<IChangeSet<TDestination, TRightKey>> Run() => Observable.Crea
7072
return joinedCache.CaptureChanges();
7173
});
7274

73-
var leftLoader = leftCache.Connect().Select(changes =>
74-
{
75-
foreach (var change in changes.ToConcreteType())
75+
var leftLoader = leftCache.Connect()
76+
.Select(changes =>
7677
{
77-
var left = change.Current;
78-
var right = rightGrouped.Lookup(change.Key);
79-
80-
if (right.HasValue)
78+
foreach (var change in changes.ToConcreteType())
8179
{
82-
switch (change.Reason)
80+
var left = change.Current;
81+
var right = rightGrouped.Lookup(change.Key);
82+
83+
if (right.HasValue)
8384
{
84-
case ChangeReason.Add:
85-
case ChangeReason.Update:
86-
foreach (var keyvalue in right.Value.KeyValues)
87-
{
88-
joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, left, keyvalue.Value), keyvalue.Key);
89-
}
90-
91-
break;
92-
93-
case ChangeReason.Remove:
94-
foreach (var keyvalue in right.Value.KeyValues)
95-
{
96-
joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, Optional<TLeft>.None, keyvalue.Value), keyvalue.Key);
97-
}
98-
99-
break;
100-
101-
case ChangeReason.Refresh:
102-
foreach (var key in right.Value.Keys)
103-
{
104-
joinedCache.Refresh(key);
105-
}
106-
107-
break;
85+
switch (change.Reason)
86+
{
87+
case ChangeReason.Add:
88+
case ChangeReason.Update:
89+
foreach (var keyvalue in right.Value.KeyValues)
90+
{
91+
joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, left, keyvalue.Value), keyvalue.Key);
92+
}
93+
94+
break;
95+
96+
case ChangeReason.Remove:
97+
foreach (var keyvalue in right.Value.KeyValues)
98+
{
99+
joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, Optional<TLeft>.None, keyvalue.Value), keyvalue.Key);
100+
}
101+
102+
break;
103+
104+
case ChangeReason.Refresh:
105+
foreach (var key in right.Value.Keys)
106+
{
107+
joinedCache.Refresh(key);
108+
}
109+
110+
break;
111+
}
108112
}
109113
}
110-
}
111114

112-
return joinedCache.CaptureChanges();
113-
});
115+
return joinedCache.CaptureChanges();
116+
})
117+
// Don't forward initial changesets from the left side, only the right
118+
.Where(_ => hasInitialized);
119+
120+
lock (locker)
121+
{
122+
var observerSubscription = leftLoader.Merge(rightLoader).SubscribeSafe(observer);
123+
124+
hasInitialized = true;
114125

115-
return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache, rightShare.Connect());
126+
return new CompositeDisposable(observerSubscription, leftCache, rightCache, rightShare.Connect());
127+
}
116128
});
117129
}

0 commit comments

Comments
 (0)
Please sign in to comment.