1
- import { type Action , type SanityClient } from '@sanity/client'
1
+ import {
2
+ type Action ,
3
+ type MultipleActionResult ,
4
+ type SanityClient ,
5
+ type SingleMutationResult ,
6
+ } from '@sanity/client'
2
7
import { type Mutation } from '@sanity/mutator'
3
8
import { type SanityDocument } from '@sanity/types'
4
9
import { omit } from 'lodash'
5
- import { EMPTY , from , merge , type Observable } from 'rxjs'
6
- import { filter , map , mergeMap , share , take , tap } from 'rxjs/operators'
10
+ import { defer , EMPTY , from , merge , type Observable } from 'rxjs'
11
+ import { filter , map , mergeMap , scan , share , take , tap , withLatestFrom } from 'rxjs/operators'
7
12
8
13
import { type DocumentVariantType } from '../../../../util/getDocumentVariantType'
9
14
import {
@@ -13,11 +18,19 @@ import {
13
18
type MutationPayload ,
14
19
type RemoteSnapshotEvent ,
15
20
} from '../buffered-doc'
16
- import { getPairListener , type ListenerEvent , type PairListenerOptions } from '../getPairListener'
21
+ import {
22
+ type DocumentStoreExtraOptions ,
23
+ getPairListener ,
24
+ type LatencyReportEvent ,
25
+ type ListenerEvent ,
26
+ } from '../getPairListener'
17
27
import { type IdPair , type PendingMutationsEvent , type ReconnectEvent } from '../types'
18
28
import { actionsApiClient } from './utils/actionsApiClient'
19
29
import { operationsApiClient } from './utils/operationsApiClient'
20
30
31
+ /** Timeout on request that fetches shard name before reporting latency */
32
+ const FETCH_SHARD_TIMEOUT = 20_000
33
+
21
34
const isMutationEventForDocId =
22
35
( id : string ) =>
23
36
(
@@ -41,6 +54,13 @@ export type DocumentVersionEvent = WithVersion<ReconnectEvent | BufferedDocument
41
54
* @beta */
42
55
export type RemoteSnapshotVersionEvent = WithVersion < RemoteSnapshotEvent >
43
56
57
+ /**
58
+ * @hidden
59
+ * @beta
60
+ * The SingleMutationResult type from `@sanity/client` doesn't reflect what we get back from POST /mutate
61
+ */
62
+ export type MutationResult = Omit < SingleMutationResult , 'documentId' >
63
+
44
64
/**
45
65
* @hidden
46
66
* @beta */
@@ -156,7 +176,11 @@ function commitActions(client: SanityClient, idPair: IdPair, mutationParams: Mut
156
176
} )
157
177
}
158
178
159
- function commitMutations ( client : SanityClient , idPair : IdPair , mutationParams : Mutation [ 'params' ] ) {
179
+ function commitMutations (
180
+ client : SanityClient ,
181
+ idPair : IdPair ,
182
+ mutationParams : Mutation [ 'params' ] ,
183
+ ) : Promise < MutationResult > {
160
184
const { resultRev, ...mutation } = mutationParams
161
185
return operationsApiClient ( client , idPair ) . dataRequest ( 'mutate' , mutation , {
162
186
visibility : 'async' ,
@@ -173,7 +197,7 @@ function submitCommitRequest(
173
197
idPair : IdPair ,
174
198
request : CommitRequest ,
175
199
serverActionsEnabled : boolean ,
176
- ) {
200
+ ) : Observable < MultipleActionResult | MutationResult > {
177
201
return from (
178
202
serverActionsEnabled
179
203
? commitActions ( client , idPair , request . mutation . params )
@@ -197,16 +221,30 @@ function submitCommitRequest(
197
221
)
198
222
}
199
223
224
+ type LatencyTrackingEvent = {
225
+ transactionId : string
226
+ submittedAt : Date
227
+ receivedAt : Date
228
+ deltaMs : number
229
+ }
230
+
231
+ type LatencyTrackingState = {
232
+ pending : { type : 'receive' | 'submit' ; transactionId : string ; timestamp : Date } [ ]
233
+ event : LatencyTrackingEvent | undefined
234
+ }
235
+
200
236
/** @internal */
201
237
export function checkoutPair (
202
238
client : SanityClient ,
203
239
idPair : IdPair ,
204
240
serverActionsEnabled : Observable < boolean > ,
205
- pairListenerOptions ?: PairListenerOptions ,
241
+ options : DocumentStoreExtraOptions = { } ,
206
242
) : Pair {
207
243
const { publishedId, draftId, versionId} = idPair
208
244
209
- const listenerEvents$ = getPairListener ( client , idPair , pairListenerOptions ) . pipe ( share ( ) )
245
+ const { onReportLatency, onSyncErrorRecovery, tag} = options
246
+
247
+ const listenerEvents$ = getPairListener ( client , idPair , { onSyncErrorRecovery, tag} ) . pipe ( share ( ) )
210
248
211
249
const reconnect$ = listenerEvents$ . pipe (
212
250
filter ( ( ev ) => ev . type === 'reconnect' ) ,
@@ -248,6 +286,19 @@ export function checkoutPair(
248
286
) ,
249
287
) ,
250
288
) ,
289
+ )
290
+
291
+ // Note: we're only subscribing to this for the side-effect
292
+ const combinedEvents = defer ( ( ) =>
293
+ onReportLatency
294
+ ? reportLatency ( {
295
+ commits$ : commits$ ,
296
+ listenerEvents$ : listenerEvents$ ,
297
+ client,
298
+ onReportLatency,
299
+ } )
300
+ : merge ( commits$ , listenerEvents$ ) ,
301
+ ) . pipe (
251
302
mergeMap ( ( ) => EMPTY ) ,
252
303
share ( ) ,
253
304
)
@@ -256,22 +307,92 @@ export function checkoutPair(
256
307
transactionsPendingEvents$,
257
308
draft : {
258
309
...draft ,
259
- events : merge ( commits$ , reconnect$ , draft . events ) . pipe ( map ( setVersion ( 'draft' ) ) ) ,
310
+ events : merge ( combinedEvents , reconnect$ , draft . events ) . pipe ( map ( setVersion ( 'draft' ) ) ) ,
260
311
remoteSnapshot$ : draft . remoteSnapshot$ . pipe ( map ( setVersion ( 'draft' ) ) ) ,
261
312
} ,
262
313
...( typeof version === 'undefined'
263
314
? { }
264
315
: {
265
316
version : {
266
317
...version ,
267
- events : merge ( commits$ , reconnect$ , version . events ) . pipe ( map ( setVersion ( 'version' ) ) ) ,
318
+ events : merge ( combinedEvents , reconnect$ , version . events ) . pipe (
319
+ map ( setVersion ( 'version' ) ) ,
320
+ ) ,
268
321
remoteSnapshot$ : version . remoteSnapshot$ . pipe ( map ( setVersion ( 'version' ) ) ) ,
269
322
} ,
270
323
} ) ,
271
324
published : {
272
325
...published ,
273
- events : merge ( commits$ , reconnect$ , published . events ) . pipe ( map ( setVersion ( 'published' ) ) ) ,
326
+ events : merge ( combinedEvents , reconnect$ , published . events ) . pipe (
327
+ map ( setVersion ( 'published' ) ) ,
328
+ ) ,
274
329
remoteSnapshot$ : published . remoteSnapshot$ . pipe ( map ( setVersion ( 'published' ) ) ) ,
275
330
} ,
276
331
}
277
332
}
333
+
334
+ function reportLatency ( options : {
335
+ commits$ : Observable < MultipleActionResult | MutationResult >
336
+ listenerEvents$ : Observable < ListenerEvent >
337
+ client : SanityClient
338
+ onReportLatency : ( event : LatencyReportEvent ) => void
339
+ } ) {
340
+ const { client, commits$, listenerEvents$, onReportLatency} = options
341
+ // Note: this request happens once and the result is then cached indefinitely
342
+ const shardInfo = fetch ( client . getUrl ( client . getDataUrl ( 'ping' ) ) , {
343
+ signal : AbortSignal . timeout ( FETCH_SHARD_TIMEOUT ) ,
344
+ } )
345
+ . then ( ( response ) => response . headers . get ( 'X-Sanity-Shard' ) || undefined )
346
+ . catch ( ( ) => undefined )
347
+
348
+ const submittedMutations = commits$ . pipe (
349
+ map ( ( ev ) => ( {
350
+ type : 'submit' as const ,
351
+ transactionId : ev . transactionId ,
352
+ timestamp : new Date ( ) ,
353
+ } ) ) ,
354
+ share ( ) ,
355
+ )
356
+
357
+ const receivedMutations = listenerEvents$ . pipe (
358
+ filter ( ( ev ) => ev . type === 'mutation' ) ,
359
+ map ( ( ev ) => ( {
360
+ type : 'receive' as const ,
361
+ transactionId : ev . transactionId ,
362
+ timestamp : new Date ( ) ,
363
+ } ) ) ,
364
+ share ( ) ,
365
+ )
366
+
367
+ return merge ( submittedMutations , receivedMutations ) . pipe (
368
+ scan (
369
+ ( state : LatencyTrackingState , event ) : LatencyTrackingState => {
370
+ const matchingIndex = state . pending . findIndex (
371
+ ( e ) => e . transactionId === event . transactionId ,
372
+ )
373
+ if ( matchingIndex > - 1 ) {
374
+ const matching = state . pending [ matchingIndex ]
375
+ const [ submitEvent , receiveEvent ] =
376
+ matching . type == 'submit' ? [ matching , event ] : [ event , matching ]
377
+ return {
378
+ event : {
379
+ transactionId : event . transactionId ,
380
+ submittedAt : submitEvent . timestamp ,
381
+ receivedAt : submitEvent . timestamp ,
382
+ deltaMs : receiveEvent . timestamp . getTime ( ) - submitEvent . timestamp . getTime ( ) ,
383
+ } ,
384
+ pending : state . pending . toSpliced ( matchingIndex , 1 ) ,
385
+ }
386
+ }
387
+ return { event : undefined , pending : state . pending . concat ( event ) }
388
+ } ,
389
+ { event : undefined , pending : [ ] } ,
390
+ ) ,
391
+ map ( ( state ) => state . event ) ,
392
+ filter ( ( event ) => ! ! event ) ,
393
+ withLatestFrom ( shardInfo ) ,
394
+ tap ( ( [ event , shard ] ) =>
395
+ onReportLatency ?.( { latencyMs : event . deltaMs , shard, transactionId : event . transactionId } ) ,
396
+ ) ,
397
+ )
398
+ }
0 commit comments