Skip to content

Commit

Permalink
fix: add timeout method to remote socket (#4558)
Browse files Browse the repository at this point in the history
The RemoteSocket interface, which is returned when the client is
connected on another Socket.IO server of the cluster, was lacking the
`timeout()` method.

Syntax:

```js
const sockets = await io.fetchSockets();

for (const socket of sockets) {
  if (someCondition) {
    socket.timeout(1000).emit("some-event", (err) => {
      if (err) {
        // the client did not acknowledge the event in the given delay
      }
    });
  }
}
```

Related: #4595
  • Loading branch information
walde1024 authored and darrachequesne committed Jan 24, 2023
1 parent f8640d9 commit 0c0eb00
Showing 1 changed file with 47 additions and 4 deletions.
51 changes: 47 additions & 4 deletions lib/broadcast-operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
EventNames,
EventsMap,
TypedEventBroadcaster,
DecorateAcknowledgements,
DecorateAcknowledgementsWithTimeoutAndMultipleResponses,
AllButLast,
Last,
Expand All @@ -20,7 +21,9 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
private readonly adapter: Adapter,
private readonly rooms: Set<Room> = new Set<Room>(),
private readonly exceptRooms: Set<Room> = new Set<Room>(),
private readonly flags: BroadcastFlags = {}
private readonly flags: BroadcastFlags & {
expectSingleResponse?: boolean;
} = {}
) {}

/**
Expand Down Expand Up @@ -232,7 +235,10 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>

const timer = setTimeout(() => {
timedOut = true;
ack.apply(this, [new Error("operation has timed out"), responses]);
ack.apply(this, [
new Error("operation has timed out"),
this.flags.expectSingleResponse ? null : responses,
]);
}, this.flags.timeout);

let expectedServerCount = -1;
Expand All @@ -246,7 +252,10 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
responses.length === expectedClientCount
) {
clearTimeout(timer);
ack.apply(this, [null, responses]);
ack.apply(this, [
null,
this.flags.expectSingleResponse ? null : responses,
]);
}
};

Expand Down Expand Up @@ -476,10 +485,44 @@ export class RemoteSocket<EmitEvents extends EventsMap, SocketData>
this.data = details.data;
this.operator = new BroadcastOperator<EmitEvents, SocketData>(
adapter,
new Set([this.id])
new Set([this.id]),
new Set(),
{
expectSingleResponse: true, // so that remoteSocket.emit() with acknowledgement behaves like socket.emit()
}
);
}

/**
* Adds a timeout in milliseconds for the next operation.
*
* @example
* const sockets = await io.fetchSockets();
*
* for (const socket of sockets) {
* if (someCondition) {
* socket.timeout(1000).emit("some-event", (err) => {
* if (err) {
* // the client did not acknowledge the event in the given delay
* }
* });
* }
* }
*
* // note: if possible, using a room instead of looping over all sockets is preferable
* io.timeout(1000).to(someConditionRoom).emit("some-event", (err, responses) => {
* // ...
* });
*
* @param timeout
*/
public timeout(timeout: number) {
return this.operator.timeout(timeout) as BroadcastOperator<
DecorateAcknowledgements<EmitEvents>,
SocketData
>;
}

public emit<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: EventParams<EmitEvents, Ev>
Expand Down

0 comments on commit 0c0eb00

Please sign in to comment.