Skip to content

Commit 963a395

Browse files
dmaier-redislabstishun
andauthoredMar 5, 2025··
feat: Sharded pub/sub support via dedicated subscribers (#1956)
* Added sharded pub/sub support by implementing a cluster subscriber group * Rewrote the resubscribe logic for sharded PubSub by making the sharded subscriber group aware of the channels. * Fixed potentially leaking connections when calling disconnect on the cluster object * Added and extended the integration test cases in regards to sharded pubsub. * Added a Github action that allows running tests manually * Provided documentation about how to use sharded pub/sub --------- Co-authored-by: Tihomir Krasimirov Mateev <tihomir.mateev@redis.com>
1 parent 65aed15 commit 963a395

12 files changed

+728
-64
lines changed
 

‎.github/workflows/release.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ concurrency:
88

99
jobs:
1010
test:
11-
uses: ./.github/workflows/test.yml
11+
uses: ./.github/workflows/test_with_cov.yml
1212
release:
1313
runs-on: ubuntu-latest
1414
needs: test

‎.github/workflows/test.yml

+1-25
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
strategy:
1212
fail-fast: false
1313
matrix:
14-
node: [12.x, 14.x, 16.x, 18.x]
14+
node: [12.x, 14.x, 16.x, 18.x, 20.x]
1515
steps:
1616
- name: Git checkout
1717
uses: actions/checkout@v2
@@ -33,27 +33,3 @@ jobs:
3333
- run: npm run build
3434
- run: npm run test:tsd
3535
- run: npm run test:cov || npm run test:cov || npm run test:cov
36-
- name: Coveralls
37-
if: matrix.node == '18.x'
38-
uses: coverallsapp/github-action@master
39-
with:
40-
github-token: ${{ secrets.GITHUB_TOKEN }}
41-
flag-name: node-${{matrix.node}}
42-
parallel: true
43-
44-
# test-cluster:
45-
# runs-on: ubuntu-latest
46-
# steps:
47-
# - uses: actions/checkout@v2
48-
# - name: Build and test cluster
49-
# run: bash test/cluster/docker/main.sh
50-
51-
code-coverage:
52-
needs: test
53-
runs-on: ubuntu-latest
54-
steps:
55-
- name: Coveralls
56-
uses: coverallsapp/github-action@master
57-
with:
58-
github-token: ${{ secrets.GITHUB_TOKEN }}
59-
parallel-finished: true

‎.github/workflows/test_with_cov.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ jobs:
77
strategy:
88
fail-fast: false
99
matrix:
10-
node: [12.x, 14.x, 16.x, 18.x]
10+
node: [12.x, 14.x, 16.x, 18.x, 20.x]
1111
steps:
1212
- name: Git checkout
1313
uses: actions/checkout@v2

‎README.md

+33-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ used in the world's biggest online commerce company [Alibaba](http://www.alibaba
4646
| Version | Branch | Node.js Version | Redis Version |
4747
| -------------- | ------ | --------------- | --------------- |
4848
| 5.x.x (latest) | main | >= 12 | 2.6.12 ~ latest |
49-
| 4.x.x | v4 | >= 6 | 2.6.12 ~ 7 |
49+
| 4.x.x | v4 | >= 8 | 2.6.12 ~ 7 |
5050

5151
Refer to [CHANGELOG.md](CHANGELOG.md) for features and bug fixes introduced in v5.
5252

@@ -1196,6 +1196,38 @@ sub.subscribe("news", () => {
11961196
});
11971197
```
11981198

1199+
### Sharded Pub/Sub
1200+
1201+
For sharded Pub/Sub, use the `spublish` and `ssubscribe` commands instead of the traditional `publish` and `subscribe`. With the old commands, the Redis cluster handles message propagation behind the scenes, allowing you to publish or subscribe to any node without considering sharding. However, this approach has scalability limitations that are addressed with sharded Pub/Sub. Here’s what you need to know:
1202+
1203+
1. Instead of a single subscriber connection, there is now one subscriber connection per shard. Because of the potential overhead, you can enable or disable the use of the cluster subscriber group with the `shardedSubscribers` option. By default, this option is set to `false`, meaning sharded subscriptions are disabled. You should enable this option when establishing your cluster connection before using `ssubscribe`.
1204+
2. All channel names that you pass to a single `ssubscribe` need to map to the same hash slot. You can call `ssubscribe` multiple times on the same cluster client instance to subscribe to channels across slots. The cluster's subscriber group takes care of forwarding the `ssubscribe` command to the shard that is responsible for the channels.
1205+
1206+
The following basic example shows you how to use sharded Pub/Sub:
1207+
1208+
```javascript
1209+
const cluster: Cluster = new Cluster([{host: host, port: port}], {shardedSubscribers: true});
1210+
1211+
//Register the callback
1212+
cluster.on("smessage", (channel, message) => {
1213+
console.log(message);
1214+
});
1215+
1216+
1217+
//Subscribe to the channels on the same slot
1218+
cluster.ssubscribe("channel{my}:1", "channel{my}:2").then( ( count: number ) => {
1219+
console.log(count);
1220+
}).catch( (err) => {
1221+
console.log(err);
1222+
});
1223+
1224+
//Publish a message
1225+
cluster.spublish("channel{my}:1", "This is a test message to my first channel.").then((value: number) => {
1226+
console.log("Published a message to channel{my}:1");
1227+
});
1228+
```
1229+
1230+
11991231
### Events
12001232

12011233
| Event | Description |

‎lib/cluster/ClusterOptions.ts

+12
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,17 @@ export interface ClusterOptions extends CommanderOptions {
121121
*/
122122
slotsRefreshInterval?: number;
123123

124+
125+
/**
126+
* Use sharded subscribers instead of a single subscriber.
127+
*
128+
* If sharded subscribers are used, then one additional subscriber connection per master node
129+
* is established. If you don't plan to use SPUBLISH/SSUBSCRIBE, then this should be disabled.
130+
*
131+
* @default false
132+
*/
133+
shardedSubscribers?: boolean;
134+
124135
/**
125136
* Passed to the constructor of `Redis`
126137
*
@@ -216,4 +227,5 @@ export const DEFAULT_CLUSTER_OPTIONS: ClusterOptions = {
216227
dnsLookup: lookup,
217228
enableAutoPipelining: false,
218229
autoPipeliningIgnoredCommands: [],
230+
shardedSubscribers: false,
219231
};

‎lib/cluster/ClusterSubscriber.ts

+45-5
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,18 @@ const debug = Debug("cluster:subscriber");
88

99
export default class ClusterSubscriber {
1010
private started = false;
11+
12+
//There is only one connection for the entire pool
1113
private subscriber: any = null;
1214
private lastActiveSubscriber: any;
1315

16+
//The slot range for which this subscriber is responsible
17+
private slotRange: number[] = []
18+
1419
constructor(
1520
private connectionPool: ConnectionPool,
16-
private emitter: EventEmitter
21+
private emitter: EventEmitter,
22+
private isSharded : boolean = false
1723
) {
1824
// If the current node we're using as the subscriber disappears
1925
// from the node pool for some reason, we will select a new one
@@ -47,6 +53,22 @@ export default class ClusterSubscriber {
4753
return this.subscriber;
4854
}
4955

56+
/**
57+
* Associate this subscriber to a specific slot range.
58+
*
59+
* Returns the range or an empty array if the slot range couldn't be associated.
60+
*
61+
* BTW: This is more for debugging and testing purposes.
62+
*
63+
* @param range
64+
*/
65+
associateSlotRange(range: number[]): number[] {
66+
if (this.isSharded) {
67+
this.slotRange = range;
68+
}
69+
return this.slotRange;
70+
}
71+
5072
start(): void {
5173
this.started = true;
5274
this.selectSubscriber();
@@ -59,9 +81,13 @@ export default class ClusterSubscriber {
5981
this.subscriber.disconnect();
6082
this.subscriber = null;
6183
}
62-
debug("stopped");
6384
}
6485

86+
isStarted(): boolean {
87+
return this.started;
88+
}
89+
90+
6591
private onSubscriberEnd = () => {
6692
if (!this.started) {
6793
debug(
@@ -112,13 +138,17 @@ export default class ClusterSubscriber {
112138
* provided for the subscriber is correct, and if not, the current subscriber
113139
* will be disconnected and a new subscriber will be selected.
114140
*/
141+
let connectionPrefix = "subscriber";
142+
if (this.isSharded)
143+
connectionPrefix = "ssubscriber";
144+
115145
this.subscriber = new Redis({
116146
port: options.port,
117147
host: options.host,
118148
username: options.username,
119149
password: options.password,
120150
enableReadyCheck: true,
121-
connectionName: getConnectionName("subscriber", options.connectionName),
151+
connectionName: getConnectionName(connectionPrefix, options.connectionName),
122152
lazyConnect: true,
123153
tls: options.tls,
124154
// Don't try to reconnect the subscriber connection. If the connection fails
@@ -179,17 +209,27 @@ export default class ClusterSubscriber {
179209
for (const event of [
180210
"message",
181211
"messageBuffer",
182-
"smessage",
183-
"smessageBuffer",
184212
]) {
185213
this.subscriber.on(event, (arg1, arg2) => {
186214
this.emitter.emit(event, arg1, arg2);
187215
});
188216
}
217+
189218
for (const event of ["pmessage", "pmessageBuffer"]) {
190219
this.subscriber.on(event, (arg1, arg2, arg3) => {
191220
this.emitter.emit(event, arg1, arg2, arg3);
192221
});
193222
}
223+
224+
if (this.isSharded == true) {
225+
for (const event of [
226+
"smessage",
227+
"smessageBuffer",
228+
]) {
229+
this.subscriber.on(event, (arg1, arg2) => {
230+
this.emitter.emit(event, arg1, arg2);
231+
});
232+
}
233+
}
194234
}
195235
}

‎lib/cluster/ClusterSubscriberGroup.ts

+268
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
import {Debug} from "../utils";
2+
import ClusterSubscriber from "./ClusterSubscriber";
3+
import Cluster from "./index";
4+
import ConnectionPool from "./ConnectionPool";
5+
import {getNodeKey} from "./util";
6+
import * as calculateSlot from "cluster-key-slot";
7+
const debug = Debug("cluster:subscriberGroup");
8+
9+
10+
/**
11+
* Redis differs between "normal" and sharded PubSub. If using the "normal" PubSub feature, exactly one
12+
* ClusterSubscriber exists per cluster instance. This works because the Redis cluster bus forwards m
13+
* messages between shards. However, this has scalability limitations, which is the reason why the sharded
14+
* PubSub feature was added to Redis. With sharded PubSub, each shard is responsible for its own messages.
15+
* Given that, we need at least one ClusterSubscriber per master endpoint/node.
16+
*
17+
* This class leverages the previously exising ClusterSubscriber by adding support for multiple such subscribers
18+
* in alignment to the master nodes of the cluster. The ClusterSubscriber class was extended in a non-breaking way
19+
* to support this feature.
20+
*/
21+
export default class ClusterSubscriberGroup {
22+
23+
private shardedSubscribers: Map<string, ClusterSubscriber> = new Map();
24+
private clusterSlots: string[][] = [];
25+
//Simple [min, max] slot ranges aren't enough because you can migrate single slots
26+
private subscriberToSlotsIndex: Map<string, number[]> = new Map();
27+
private channels: Map<number, Array<(string | Buffer)>> = new Map();
28+
29+
/**
30+
* Register callbacks
31+
*
32+
* @param cluster
33+
*/
34+
constructor(private cluster: Cluster) {
35+
36+
cluster.on("+node", (redis) => {
37+
this._addSubscriber(redis);
38+
});
39+
40+
cluster.on("-node", (redis) => {
41+
this._removeSubscriber(redis);
42+
});
43+
44+
cluster.on("refresh", () => {
45+
this._refreshSlots(cluster);
46+
});
47+
}
48+
49+
50+
/**
51+
* Get the responsible subscriber.
52+
*
53+
* Returns null if no subscriber was found
54+
*
55+
* @param slot
56+
*/
57+
getResponsibleSubscriber(slot: number) : ClusterSubscriber {
58+
const nodeKey = this.clusterSlots[slot][0]
59+
return this.shardedSubscribers.get(nodeKey);
60+
}
61+
62+
/**
63+
* Adds a channel for which this subscriber group is responsible
64+
*
65+
* @param channels
66+
*/
67+
addChannels(channels: (string | Buffer)[]): number {
68+
const slot = calculateSlot(channels[0]);
69+
70+
//Check if the all channels belong to the same slot and otherwise reject the operation
71+
channels.forEach((c: string) => {
72+
if (calculateSlot(c) != slot)
73+
return -1
74+
});
75+
76+
const currChannels = this.channels.get(slot);
77+
78+
if (!currChannels) {
79+
this.channels.set(slot, channels);
80+
} else {
81+
this.channels.set(slot, currChannels.concat(channels))
82+
}
83+
84+
return [...this.channels.values()].flatMap(v => v).length;
85+
}
86+
87+
/**
88+
* Removes channels for which the subscriber group is responsible by optionally unsubscribing
89+
* @param channels
90+
*/
91+
removeChannels(channels: (string | Buffer)[]): number {
92+
93+
const slot = calculateSlot(channels[0]);
94+
95+
//Check if the all channels belong to the same slot and otherwise reject the operation
96+
channels.forEach((c: string) => {
97+
if (calculateSlot(c) != slot)
98+
return -1;
99+
});
100+
101+
const slotChannels = this.channels.get(slot);
102+
103+
if (slotChannels) {
104+
const updatedChannels = slotChannels.filter(c => !channels.includes(c));
105+
this.channels.set(slot, updatedChannels);
106+
}
107+
108+
return [...this.channels.values()].flatMap(v => v).length;
109+
}
110+
111+
/**
112+
* Disconnect all subscribers
113+
*/
114+
stop() {
115+
for (const s of this.shardedSubscribers.values()) {
116+
s.stop();
117+
}
118+
}
119+
120+
/**
121+
* Start all not yet started subscribers
122+
*/
123+
start() {
124+
for (const s of this.shardedSubscribers.values()) {
125+
if (!s.isStarted()) {
126+
s.start();
127+
}
128+
}
129+
}
130+
131+
/**
132+
* Add a subscriber to the group of subscribers
133+
*
134+
* @param redis
135+
*/
136+
private _addSubscriber(redis: any): ClusterSubscriber {
137+
const pool: ConnectionPool= new ConnectionPool(redis.options);
138+
139+
if (pool.addMasterNode(redis)) {
140+
const sub = new ClusterSubscriber(pool, this.cluster, true);
141+
const nodeKey = getNodeKey(redis.options);
142+
this.shardedSubscribers.set(nodeKey, sub);
143+
sub.start();
144+
145+
// We need to attempt to resubscribe them in case the new node serves their slot
146+
this._resubscribe();
147+
this.cluster.emit("+subscriber");
148+
return sub;
149+
}
150+
151+
return null;
152+
}
153+
154+
/**
155+
* Removes a subscriber from the group
156+
* @param redis
157+
*/
158+
private _removeSubscriber(redis: any): Map<string, ClusterSubscriber> {
159+
160+
const nodeKey = getNodeKey(redis.options);
161+
const sub = this.shardedSubscribers.get(nodeKey);
162+
163+
if (sub) {
164+
sub.stop();
165+
this.shardedSubscribers.delete(nodeKey);
166+
167+
// Even though the subscriber to this node is going down, we might have another subscriber
168+
// handling the same slots, so we need to attempt to subscribe the orphaned channels
169+
this._resubscribe();
170+
this.cluster.emit("-subscriber");
171+
}
172+
173+
return this.shardedSubscribers;
174+
}
175+
176+
177+
/**
178+
* Refreshes the subscriber-related slot ranges
179+
*
180+
* Returns false if no refresh was needed
181+
*
182+
* @param cluster
183+
*/
184+
private _refreshSlots(cluster: Cluster) : boolean {
185+
//If there was an actual change, then reassign the slot ranges
186+
if (this._slotsAreEqual(cluster.slots)) {
187+
debug("Nothing to refresh because the new cluster map is equal to the previous one.")
188+
} else {
189+
debug("Refreshing the slots of the subscriber group.");
190+
191+
//Rebuild the slots index
192+
this.subscriberToSlotsIndex = new Map();
193+
194+
for (let slot = 0; slot < cluster.slots.length; slot++) {
195+
const node: string = cluster.slots[slot][0];
196+
197+
if (!this.subscriberToSlotsIndex.has(node)) {
198+
this.subscriberToSlotsIndex.set(node, []);
199+
}
200+
this.subscriberToSlotsIndex.get(node).push(Number(slot))
201+
}
202+
203+
//Update the subscribers from the index
204+
this._resubscribe()
205+
206+
//Update the cached slots map
207+
this.clusterSlots = JSON.parse(JSON.stringify(cluster.slots));
208+
209+
this.cluster.emit("subscribersReady")
210+
return true;
211+
}
212+
213+
return false;
214+
}
215+
216+
217+
/**
218+
* Resubscribes to the previous channels
219+
*
220+
* @private
221+
*/
222+
private _resubscribe() {
223+
if (this.shardedSubscribers) {
224+
this.shardedSubscribers.forEach((s: ClusterSubscriber, nodeKey: string) => {
225+
const subscriberSlots = this.subscriberToSlotsIndex.get(nodeKey);
226+
if (subscriberSlots) {
227+
//More for debugging purposes
228+
s.associateSlotRange(subscriberSlots);
229+
230+
//Resubscribe on the underlying connection
231+
subscriberSlots.forEach((ss) => {
232+
233+
//Might return null if being disconnected
234+
const redis = s.getInstance();
235+
const channels = this.channels.get(ss);
236+
237+
if (channels && channels.length > 0) {
238+
//Try to subscribe now
239+
if (redis) {
240+
redis.ssubscribe(channels);
241+
242+
//If the instance isn't ready yet, then register the re-subscription for later
243+
redis.on("ready", () => {
244+
redis.ssubscribe(channels);
245+
});
246+
}
247+
}
248+
});
249+
}
250+
});
251+
}
252+
}
253+
254+
/**
255+
* Deep equality of the cluster slots objects
256+
*
257+
* @param other
258+
* @private
259+
*/
260+
private _slotsAreEqual(other: string[][]) {
261+
if ( this.clusterSlots === undefined )
262+
return false;
263+
else
264+
return JSON.stringify(this.clusterSlots) === JSON.stringify(other)
265+
}
266+
267+
268+
}

‎lib/cluster/ConnectionPool.ts

+50-18
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,55 @@ export default class ConnectionPool extends EventEmitter {
3636
return this.nodes[role][sampleKey];
3737
}
3838

39+
40+
/**
41+
* Add a master node to the pool
42+
* @param node
43+
*/
44+
addMasterNode(node: RedisOptions) {
45+
const key = getNodeKey(node.options);
46+
const redis : Redis = this.createRedisFromOptions(node, node.options.readOnly)
47+
48+
//Master nodes aren't read-only
49+
if (!node.options.readOnly) {
50+
this.nodes.all[key] = redis;
51+
this.nodes.master[key] = redis;
52+
return true;
53+
}
54+
55+
return false;
56+
}
57+
58+
59+
/**
60+
* Creates a Redis connection instance from the node options
61+
* @param node
62+
* @param readOnly
63+
*/
64+
createRedisFromOptions(node: RedisOptions, readOnly: boolean) {
65+
const redis = new Redis(
66+
defaults(
67+
{
68+
// Never try to reconnect when a node is lose,
69+
// instead, waiting for a `MOVED` error and
70+
// fetch the slots again.
71+
retryStrategy: null,
72+
// Offline queue should be enabled so that
73+
// we don't need to wait for the `ready` event
74+
// before sending commands to the node.
75+
enableOfflineQueue: true,
76+
readOnly: readOnly,
77+
},
78+
node,
79+
this.redisOptions,
80+
{ lazyConnect: true }
81+
)
82+
);
83+
84+
return redis
85+
}
86+
87+
3988
/**
4089
* Find or create a connection to the node
4190
*/
@@ -66,24 +115,7 @@ export default class ConnectionPool extends EventEmitter {
66115
}
67116
} else {
68117
debug("Connecting to %s as %s", key, readOnly ? "slave" : "master");
69-
redis = new Redis(
70-
defaults(
71-
{
72-
// Never try to reconnect when a node is lose,
73-
// instead, waiting for a `MOVED` error and
74-
// fetch the slots again.
75-
retryStrategy: null,
76-
// Offline queue should be enabled so that
77-
// we don't need to wait for the `ready` event
78-
// before sending commands to the node.
79-
enableOfflineQueue: true,
80-
readOnly: readOnly,
81-
},
82-
node,
83-
this.redisOptions,
84-
{ lazyConnect: true }
85-
)
86-
);
118+
redis = this.createRedisFromOptions(node, readOnly);
87119
this.nodes.all[key] = redis;
88120
this.nodes[readOnly ? "slave" : "master"][key] = redis;
89121

‎lib/cluster/index.ts

+43-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import ClusterSubscriber from "./ClusterSubscriber";
2626
import ConnectionPool from "./ConnectionPool";
2727
import DelayQueue from "./DelayQueue";
2828
import {
29-
getConnectionName,
29+
getConnectionName, getNodeKey,
3030
getUniqueHostnamesFromOptions,
3131
groupSrvRecords,
3232
NodeKey,
@@ -37,6 +37,7 @@ import {
3737
weightSrvRecords,
3838
} from "./util";
3939
import Deque = require("denque");
40+
import ClusterSubscriberGroup from "./ClusterSubscriberGroup";
4041

4142
const debug = Debug("cluster");
4243

@@ -96,6 +97,7 @@ class Cluster extends Commander {
9697
private delayQueue: DelayQueue = new DelayQueue();
9798
private offlineQueue = new Deque<OfflineQueueItem>();
9899
private subscriber: ClusterSubscriber;
100+
private shardedSubscribers: ClusterSubscriberGroup;
99101
private slotsTimer: NodeJS.Timer;
100102
private reconnectTimeout: NodeJS.Timer;
101103
private isRefreshing = false;
@@ -115,13 +117,17 @@ class Cluster extends Commander {
115117
/**
116118
* Creates an instance of Cluster.
117119
*/
120+
//TODO: Add an option that enables or disables sharded PubSub
118121
constructor(startupNodes: ClusterNode[], options: ClusterOptions = {}) {
119122
super();
120123
EventEmitter.call(this);
121124

122125
this.startupNodes = startupNodes;
123126
this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options);
124127

128+
if (this.options.shardedSubscribers == true)
129+
this.shardedSubscribers = new ClusterSubscriberGroup(this);
130+
125131
if (
126132
this.options.redisOptions &&
127133
this.options.redisOptions.keyPrefix &&
@@ -268,6 +274,10 @@ class Cluster extends Commander {
268274
}
269275
});
270276
this.subscriber.start();
277+
278+
if (this.options.shardedSubscribers) {
279+
this.shardedSubscribers.start();
280+
}
271281
})
272282
.catch((err) => {
273283
this.setStatus("close");
@@ -296,6 +306,11 @@ class Cluster extends Commander {
296306
this.clearNodesRefreshInterval();
297307

298308
this.subscriber.stop();
309+
310+
if (this.options.shardedSubscribers) {
311+
this.shardedSubscribers.stop();
312+
}
313+
299314
if (status === "wait") {
300315
this.setStatus("close");
301316
this.handleCloseEvent();
@@ -321,6 +336,11 @@ class Cluster extends Commander {
321336

322337
this.subscriber.stop();
323338

339+
if (this.options.shardedSubscribers) {
340+
this.shardedSubscribers.stop();
341+
}
342+
343+
324344
if (status === "wait") {
325345
const ret = asCallback(Promise.resolve<"OK">("OK"), callback);
326346

@@ -547,7 +567,28 @@ class Cluster extends Commander {
547567
Command.checkFlag("ENTER_SUBSCRIBER_MODE", command.name) ||
548568
Command.checkFlag("EXIT_SUBSCRIBER_MODE", command.name)
549569
) {
550-
redis = _this.subscriber.getInstance();
570+
if (_this.options.shardedSubscribers == true &&
571+
(command.name == "ssubscribe" || command.name == "sunsubscribe")) {
572+
573+
const sub: ClusterSubscriber = _this.shardedSubscribers.getResponsibleSubscriber(targetSlot);
574+
let status = -1;
575+
576+
if (command.name == "ssubscribe")
577+
status = _this.shardedSubscribers.addChannels(command.getKeys());
578+
579+
if ( command.name == "sunsubscribe")
580+
status = _this.shardedSubscribers.removeChannels(command.getKeys());
581+
582+
if (status !== -1) {
583+
redis = sub.getInstance();
584+
} else {
585+
command.reject(new AbortError("Can't add or remove the given channels. Are they in the same slot?"));
586+
}
587+
}
588+
else {
589+
redis = _this.subscriber.getInstance();
590+
}
591+
551592
if (!redis) {
552593
command.reject(new AbortError("No subscriber for the cluster"));
553594
return;

‎test/cluster/basic.ts

+22
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { expect } from "chai";
22
import Redis, { Cluster } from "../../lib";
33

4+
45
const masters = [30000, 30001, 30002];
56
const replicas = [30003, 30004, 30005];
67

@@ -148,4 +149,25 @@ describe("cluster", () => {
148149
expect(await cluster2.get("prefix:foo")).to.eql("bar");
149150
});
150151
});
152+
153+
154+
describe("Test if the client performs the hash-based sharding for simple set operations", () => {
155+
it("Works when you don't get MOVED error responses", async () => {
156+
157+
// Verify that the cluster is configured with 3 master nodes
158+
const cluster : Cluster = new Cluster([{ host: "127.0.0.1", port: masters[0] }]);
159+
cluster.on("ready", () => {
160+
expect(cluster.nodes("master").length).to.eql(3);
161+
});
162+
163+
const keys = ["channel:test:3", "channel:test:2", "channel:test:0"]
164+
for (const k of keys) {
165+
let status: string = await cluster.set(k, "Test status per node");
166+
expect(status).to.eql("OK");
167+
let value: string = await cluster.get(k);
168+
expect(value).to.eql("Test status per node");
169+
}
170+
})
171+
});
172+
151173
});
+240
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
import {expect} from "chai";
2+
import Redis, {Cluster} from "../../lib";
3+
import redis from "../../lib";
4+
5+
const host = "127.0.0.1";
6+
const masters = [30000, 30001, 30002];
7+
const port: number = masters[0]
8+
9+
/**
10+
* Wait for a specified time
11+
*
12+
* @param ms
13+
*/
14+
function sleep(ms: number) {
15+
return new Promise(resolve => setTimeout(resolve, ms));
16+
}
17+
18+
19+
describe("cluster:ClusterSubscriberGroup", () => {
20+
21+
it("should unsubscribe from the given channel", async () => {
22+
23+
const cluster: Cluster = new Cluster([{host: host, port: port}], {shardedSubscribers: true});
24+
25+
//Subscribe to the three channels
26+
cluster.ssubscribe("channel:1:{1}", "channel:2:{1}", "channel:3:{1}" ).then( ( count: number ) => {
27+
console.log("Subscribed to 3 channels.");
28+
expect(count).to.equal(3);
29+
});
30+
31+
//Publish a message to one of the channels
32+
cluster.spublish("channel:2:{1}", "This is a test message to channel 2.").then((value: number) => {
33+
console.log("Published a message to channel:2:{1} and expect one subscriber.");
34+
expect(value).to.be.eql(1);
35+
});
36+
37+
await sleep(500);
38+
39+
//Unsubscribe from one of the channels
40+
cluster.sunsubscribe("channel:2:{1}").then( ( count: number ) => {
41+
console.log("Unsubscribed from channel:2:{1}.");
42+
expect(count).to.equal(2);
43+
});
44+
45+
await sleep(500);
46+
47+
//Publish a message to the channel from which we unsubscribed
48+
cluster.spublish("channel:2:{1}", "This is a test message to channel 2.").then((value: number) => {
49+
console.log("Published a second message to channel:2:{1} and expect to have nobody listening.");
50+
expect(value).to.be.eql(0);
51+
});
52+
53+
await sleep(1000);
54+
await cluster.disconnect();
55+
});
56+
57+
it("works when ssubscribe only works for keys that map to the same slot", async () => {
58+
59+
const cluster: Cluster = new Cluster([{host: host, port: port}], {shardedSubscribers: true});
60+
61+
//Register the callback
62+
cluster.on("smessage", (channel, message) => {
63+
console.log(message);
64+
expect(message.startsWith("This is a test message")).to.be.true;
65+
});
66+
67+
//Subscribe to the channels on different slots
68+
cluster.ssubscribe("channel{my}:1", "channel{yours}:2").then( ( count: number ) => {
69+
//Should not be called
70+
expect(true).to.equal(false);
71+
}).catch( (err) => {
72+
expect(err.toString().conaints("CROSSSLOT Keys in request don't hash to the same slot")).to.be.true;
73+
});
74+
75+
//Subscribe to the channels on the same slot
76+
cluster.ssubscribe("channel{my}:1", "channel{my}:2").then( ( count: number ) => {
77+
console.log(count);
78+
expect(count).to.equal(2);
79+
}).catch( (err) => {
80+
expect(true).to.equal(false);
81+
});
82+
83+
//Subscribe once again on the other slot
84+
cluster.ssubscribe("channel{yours}:2").then( ( count: number ) => {
85+
console.log(count);
86+
expect(count).to.equal(1);
87+
}).catch( (err) => {
88+
expect(true).to.equal(false);
89+
});
90+
91+
//Publish messages
92+
cluster.spublish("channel{my}:1", "This is a test message to my first channel.").then((value: number) => {
93+
console.log("Published a message to channel{my}:1");
94+
expect(value).to.be.eql(1);
95+
});
96+
97+
cluster.spublish("channel{my}:2", "This is a test message to my second channel.").then((value: number) => {
98+
console.log("Published a message to channel{my}:2");
99+
expect(value).to.be.eql(1);
100+
});
101+
102+
cluster.spublish("channel{yours}:2", "This is a test message to your second channel.").then((value: number) => {
103+
console.log("Published a message to channel{yours}:2");
104+
expect(value).to.be.eql(1);
105+
});
106+
107+
//Give it some time to process messages and then disconnect
108+
await sleep(1000);
109+
await cluster.disconnect();
110+
});
111+
112+
113+
it("works when you can receive published messages to all primary nodes after having subscribed", async () => {
114+
115+
// 0. Prepare the publisher and the subscriber
116+
const publisher: Cluster = new Cluster([{host: host, port: port}]);
117+
118+
//-- Publisher
119+
// Verify that the cluster is configured with 3 master nodes
120+
publisher.on("ready", () => {
121+
expect(publisher.nodes("master").length).to.eql(3);
122+
});
123+
124+
//-- Subscriber
125+
const subscriber: Cluster = new Cluster([{host: host, port: port}], {shardedSubscribers: true});
126+
let totalNumMessages = 0;
127+
128+
// Register the subscriber callback
129+
subscriber.on("smessage", (channel, message) => {
130+
console.log(message);
131+
expect(message.startsWith("This is a test message")).to.eql(true);
132+
expect(message.endsWith(channel + ".")).to.eql(true);
133+
totalNumMessages++;
134+
expect(totalNumMessages).to.lte(3);
135+
});
136+
137+
//Verify that we did not get more than 3 subscribers
138+
let numSubs = 0;
139+
subscriber.on("+subscriber", () => {
140+
numSubs++
141+
expect(numSubs).to.lte(3);
142+
});
143+
144+
//1. Construct 3 channel names, whereby the first one is expected to land on node 1, the second one on node 2, and so on
145+
const channels = ["channel:test:3", "channel:test:2", "channel:test:0"]
146+
147+
for (const c of channels) {
148+
console.log("Trying to publish to channel:", c);
149+
150+
//2. Subscribe to the channel
151+
await subscriber.ssubscribe(c)
152+
153+
//3. Publish a message before initializing the message handling
154+
const numSubscribers = await publisher.spublish(c, "This is a test message " + c + ".");
155+
expect(numSubscribers).to.eql(1);
156+
}
157+
158+
//Give it some time to process messages and then disconnect
159+
await sleep(1000);
160+
subscriber.disconnect();
161+
});
162+
163+
it("receive messages on the channel after the slot was moved", async () => {
164+
165+
//The hash slot of interest
166+
const slot = 2318;
167+
const channel = "channel:test:3";
168+
169+
//Used as control connections for orchestrating the slot migration
170+
const source: Redis = new Redis({host: host, port: 30000});
171+
const target: Redis = new Redis({host: host, port: 30001});
172+
173+
//Initialize the publisher cluster connections and verify that the slot is on node 1
174+
const publisher: Cluster = new Cluster([{host: host, port: port}]);
175+
176+
publisher.on("ready", () => {
177+
expect(publisher.slots[slot][0]).eql("127.0.0.1:30000");
178+
});
179+
180+
181+
//Initialize the subscriber cluster connections and verify that the slot is on node 1
182+
const subscriber: Cluster = new Cluster([{host: host, port: port}], {shardedSubscribers: true});
183+
184+
subscriber.on("ready", () => {
185+
expect(subscriber.slots[slot][0]).eql("127.0.0.1:30000")
186+
});
187+
188+
//The subscription callback. We should receive both messages
189+
let totalNumMessages = 0;
190+
subscriber.on("smessage", (channel, message) => {
191+
totalNumMessages++;
192+
193+
if (totalNumMessages == 1) {
194+
console.log("Received the first message");
195+
expect(message.includes("#1")).to.eql(true);
196+
}
197+
198+
if (totalNumMessages == 2) {
199+
console.log("Received the second message");
200+
expect(message.includes("#2")).to.eql(true);
201+
}
202+
});
203+
204+
//Subscribe and then publish
205+
await subscriber.ssubscribe(channel);
206+
await publisher.spublish(channel, "This is a test message #1 to slot "
207+
+ slot + " on channel " + channel + ".");
208+
209+
//Get the target node
210+
const nodes = await source.cluster('SLOTS');
211+
const sourceNode = nodes[0][2][2];
212+
const targetNode = nodes[1][2][2];
213+
214+
//Migrate the slot
215+
console.log(`Migrating slot ${slot} to ${targetNode}`);
216+
let status = ""
217+
status = await target.cluster("SETSLOT", slot, "IMPORTING", targetNode);
218+
expect(status).to.eql("OK");
219+
status = await source.cluster('SETSLOT', slot, 'MIGRATING', sourceNode);
220+
expect(status).to.eql("OK");
221+
status = await target.cluster("SETSLOT", slot, "NODE", targetNode);
222+
expect(status).to.eql("OK");
223+
status = await source.cluster("SETSLOT", slot, "NODE", targetNode);
224+
expect(status).to.eql("OK");
225+
226+
//Trigger a topology update on the subscriber. This needs at least one moved response.
227+
//TODO: What if there is no traffic on the cluster connection?
228+
status = await subscriber.set("match_slot{" + channel + "}", "channel 3");
229+
expect(status).to.eql("OK");
230+
expect(subscriber.slots[slot][0]).eql("127.0.0.1:30001");
231+
232+
//Wait a bit to let the subscriber resubscribe to previous channels
233+
await sleep(1000);
234+
235+
const numSubscribers = await publisher.spublish(channel, "This is a test message #2 to slot "
236+
+ slot + " on channel " + channel + ".");
237+
expect(publisher.slots[slot][0]).eql("127.0.0.1:30001");
238+
expect(numSubscribers).to.eql(1);
239+
});
240+
});

‎test/functional/cluster/spub_ssub.ts

+12-11
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@ describe("cluster:spub/ssub", function () {
1515
];
1616
}
1717
};
18-
const node1 = new MockServer(30001, handler);
19-
new MockServer(30002, handler);
20-
21-
const options = [{ host: "127.0.0.1", port: "30001" }];
22-
const ssub = new Cluster(options);
18+
new MockServer(30001, handler);
19+
//Node 2 is responsible for the vast majority of slots
20+
const node2 = new MockServer(30002, handler);
21+
const startupNodes = [{ host: "127.0.0.1", port: 30001 }];
22+
const clusterOptions = {shardedSubscribers: true};
23+
const ssub = new Cluster(startupNodes, clusterOptions);
2324

2425
ssub.ssubscribe("test cluster", function () {
25-
node1.write(node1.findClientByName("ioredis-cluster(subscriber)"), [
26+
const clientSocket = node2.findClientByName("ioredis-cluster(ssubscriber)");
27+
node2.write(clientSocket, [
2628
"smessage",
2729
"test shard channel",
2830
"hi",
@@ -44,7 +46,7 @@ describe("cluster:spub/ssub", function () {
4446
};
4547
new MockServer(30001, handler);
4648

47-
const ssub = new Cluster([{ port: "30001" }]);
49+
const ssub = new Cluster([{ port: "30001" }], {shardedSubscribers: true});
4850

4951
ssub.ssubscribe("test cluster", function () {
5052
ssub.set("foo", "bar").then((res) => {
@@ -63,15 +65,15 @@ describe("cluster:spub/ssub", function () {
6365
}
6466
if (argv[0] === "ssubscribe") {
6567
expect(c.password).to.eql("abc");
66-
expect(getConnectionName(c)).to.eql("ioredis-cluster(subscriber)");
68+
expect(getConnectionName(c)).to.eql("ioredis-cluster(ssubscriber)");
6769
}
6870
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
6971
return [[0, 16383, ["127.0.0.1", 30001]]];
7072
}
7173
};
7274
new MockServer(30001, handler);
7375

74-
const ssub = new Redis.Cluster([{ port: "30001", password: "abc" }]);
76+
const ssub = new Redis.Cluster([{ port: 30001, password: "abc" }], {shardedSubscribers: true});
7577

7678
ssub.ssubscribe("test cluster", function () {
7779
ssub.disconnect();
@@ -87,8 +89,7 @@ describe("cluster:spub/ssub", function () {
8789
return [argv[0], argv[1]];
8890
}
8991
});
90-
const client = new Cluster([{ host: "127.0.0.1", port: "30001" }]);
91-
92+
const client = new Cluster([{ host: "127.0.0.1", port: 30001 }], {shardedSubscribers: true});
9293
client.ssubscribe("test cluster", function () {
9394
const stub = sinon
9495
.stub(Redis.prototype, "ssubscribe")

0 commit comments

Comments
 (0)
Please sign in to comment.