Skip to content

Commit 8f61ce3

Browse files
bjoergebinoy14
authored andcommittedDec 20, 2024
fix(core): re-subscribes to shared pair listener opens a new connection (#8120)
1 parent d3d1540 commit 8f61ce3

File tree

3 files changed

+23
-18
lines changed

3 files changed

+23
-18
lines changed
 

‎packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts

+6-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import {type Action, type SanityClient} from '@sanity/client'
22
import {type Mutation} from '@sanity/mutator'
33
import {type SanityDocument} from '@sanity/types'
44
import {omit} from 'lodash'
5-
import {EMPTY, from, merge, type Observable, Subject} from 'rxjs'
5+
import {EMPTY, from, merge, type Observable} from 'rxjs'
66
import {filter, map, mergeMap, share, take, tap} from 'rxjs/operators'
77

88
import {
@@ -65,7 +65,7 @@ export interface Pair {
6565
transactionsPendingEvents$: Observable<PendingMutationsEvent>
6666
published: DocumentVersion
6767
draft: DocumentVersion
68-
complete: () => void
68+
_keepalive: Observable<never>
6969
}
7070

7171
function setVersion<T>(version: 'draft' | 'published') {
@@ -204,10 +204,7 @@ export function checkoutPair(
204204
): Pair {
205205
const {publishedId, draftId} = idPair
206206

207-
const listenerEventsConnector = new Subject<ListenerEvent>()
208-
const listenerEvents$ = getPairListener(client, idPair, pairListenerOptions).pipe(
209-
share({connector: () => listenerEventsConnector}),
210-
)
207+
const listenerEvents$ = getPairListener(client, idPair, pairListenerOptions).pipe(share())
211208

212209
const reconnect$ = listenerEvents$.pipe(
213210
filter((ev) => ev.type === 'reconnect'),
@@ -255,6 +252,8 @@ export function checkoutPair(
255252
consistency$: published.consistency$,
256253
remoteSnapshot$: published.remoteSnapshot$.pipe(map(setVersion('published'))),
257254
},
258-
complete: () => listenerEventsConnector.complete(),
255+
// Use this to keep the mutation pipeline active.
256+
// It won't ever emit any events, but it will prevent the eventsource connection from completing for as long as it is subscribed to
257+
_keepalive: commits$.pipe(mergeMap(() => EMPTY)),
259258
}
260259
}
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
import {type SanityClient} from '@sanity/client'
2-
import {Observable} from 'rxjs'
3-
import {publishReplay, refCount} from 'rxjs/operators'
2+
import {merge, type Observable, of, ReplaySubject, share, timer} from 'rxjs'
43

54
import {type PairListenerOptions} from '../getPairListener'
65
import {type IdPair} from '../types'
76
import {memoize} from '../utils/createMemoizer'
87
import {checkoutPair, type Pair} from './checkoutPair'
98
import {memoizeKeyGen} from './memoizeKeyGen'
109

10+
// How long to keep listener connected for after last unsubscribe
11+
const LISTENER_RESET_DELAY = 10_000
12+
1113
export const memoizedPair: (
1214
client: SanityClient,
1315
idPair: IdPair,
@@ -22,12 +24,18 @@ export const memoizedPair: (
2224
serverActionsEnabled: Observable<boolean>,
2325
pairListenerOptions?: PairListenerOptions,
2426
): Observable<Pair> => {
25-
return new Observable<Pair>((subscriber) => {
26-
const pair = checkoutPair(client, idPair, serverActionsEnabled, pairListenerOptions)
27-
subscriber.next(pair)
28-
29-
return pair.complete
30-
}).pipe(publishReplay(1), refCount())
27+
const pair = checkoutPair(client, idPair, serverActionsEnabled, pairListenerOptions)
28+
return merge(
29+
of(pair),
30+
// makes sure the pair listener is kept alive for as long as there are subscribers
31+
pair._keepalive,
32+
).pipe(
33+
share({
34+
connector: () => new ReplaySubject(1),
35+
resetOnComplete: true,
36+
resetOnRefCountZero: () => timer(LISTENER_RESET_DELAY),
37+
}),
38+
)
3139
},
3240
memoizeKeyGen,
3341
)

‎packages/sanity/src/core/store/_legacy/document/getPairListener.ts

+1-3
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@
22
import {type SanityClient} from '@sanity/client'
33
import {type SanityDocument} from '@sanity/types'
44
import {groupBy} from 'lodash'
5-
import {defer, merge, type Observable, of, throwError, timer} from 'rxjs'
5+
import {defer, merge, type Observable, of, throwError} from 'rxjs'
66
import {catchError, concatMap, filter, map, mergeMap, scan, share} from 'rxjs/operators'
77

8-
import {LISTENER_RESET_DELAY} from '../../../preview/constants'
98
import {shareReplayLatest} from '../../../preview/utils/shareReplayLatest'
109
import {debug} from './debug'
1110
import {
@@ -96,7 +95,6 @@ export function getPairListener(
9695
//filter((event) => Math.random() < 0.99 || event.type !== 'mutation'),
9796
shareReplayLatest({
9897
predicate: (event) => event.type === 'welcome' || event.type === 'reconnect',
99-
resetOnRefCountZero: () => timer(LISTENER_RESET_DELAY),
10098
}),
10199
),
102100
) as Observable<WelcomeEvent | MutationEvent | ReconnectEvent>

0 commit comments

Comments
 (0)
Please sign in to comment.