Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Websockets Broker relay supporting a cluster of STOMP endpoint addresses [SPR-12452] #17057

Closed
spring-projects-issues opened this issue Nov 19, 2014 · 20 comments
Assignees
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) status: backported An issue that has been backported to maintenance branches type: enhancement A general enhancement
Milestone

Comments

@spring-projects-issues
Copy link
Collaborator

spring-projects-issues commented Nov 19, 2014

Biju Kunjummen opened SPR-12452 and commented

Spring Websocket supports client STOMP subscriptions through a Message broker relay component which in turn talks to full fledged STOMP brokers like RabbitMQ and ActiveMQ.

RabbitMQ supports a clustered deployment, for native AMQP messaging the client connection factory has a way to specify all the hosts in the cluster(org.springframework.amqp.rabbit.connection.AbstractConnectionFactory#setAddresses). To support a similar set-up for a cluster of RabbitMQ servers over STOMP, an option will be to use a loadbalancer, however if support can be natively provided within Spring Websocket relay that will be even better.

Another potential option is to support Stomp from the client but then turn that to native AMQP or JMS


Affects: 4.1.2

Issue Links:

Referenced from: commits 88a17a4, d512cca

Backported to: 4.3.15

6 votes, 15 watchers

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

Biju Kunjummen, after taking a closer look, this appears to be a little more involved than anticipated since it's not merely about the initial connect but also about reconnect logic on which we do rely to the shared "system" connection. At present the TcpClient from Reactor is built to work with a single address. Yet the Reconnect type it accepts in one of its open method variants allows returning a different address. I am reluctant to proceed until there is some clarity around what changes might be made in Reactor related to the use case. I'm going to push this back to 4.1.4 to allow more time. We might still do something in that time frame or alternatively if we need a Reactor 2.0 release this may get pushed to 4.2.

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

I've opened this ticket in the reactor project reactor/reactor#412. At this point a 4.2 target seems most likely.

@spring-projects-issues
Copy link
Collaborator Author

Biju Kunjummen commented

Thanks for following up Rossen. The timeline works, the client has already decided to go for a hardware loadbalancer for the RabbitMQ STOMP cluster and the native load balancing approach with Spring Websockets/reactor a good to have for them.

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

Unfortunately the Reactor 1.x - 2.0 upgrade happened rather late in the 4.2 cycle so we'll need to postpone this further.

@spring-projects-issues
Copy link
Collaborator Author

Petr Kukrál commented

Rossen Stoyanchev, are there any news about this ticket? I look at the related jira ticket in the reactor project and it seems closed without resolution. Our team try find out solution how to horizonally scale rabbitMQ with websocket connection over stomp. The main issue is that we have several tomcat instances but only one rabbitMQ instance and we need to feed all clients with same messages over websockets. The situation is that if any of tomcat get message from shared queue (implemented via another rabbitMQ cluster in HA) and after processing it to final state than publishes it to all clients.

Our idea of scaling is that each tomcat will connect to one of the rabbitMQ instance from cluster during startup and later when clients want to create WS connection then tomcat will create WS connection via dedicated rabbitMQ from cluster. This is not an issue. But once any of tomcat get message then it needs to publish it to whole rabbitMQ cluster and not only to one that is connected to tomcat. We don't need to reconnect clients to another rabbitMQ in cluster. They can be aligned to rabbitMQ based on tomcat. Clients requests to tomcats are load balanced.

Would you mind that it is possible with/without changes in reactor project and made changes only in spring websocket project?

@spring-projects-issues
Copy link
Collaborator Author

Filip Hrisafov commented

A small suggestion for this. It is not that difficult to implement a standalone implementation of a Reactor2TcpClient that would support using Supplier<InetSocketAddress> instead of just the one address?

I managed to do something for my by using Spec.TcpClientSpec#connect(Supplier<InetSocketAddress>). If this was exposed in a constructor then one can really easily implement something like the ActiveMQ failover protocol. I am also open in providing a patch for this if you are willing to accept it for the 4.3.x branch. I am not sure if it is the same for the master (as the reactor implementation there is different).

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

Filip Hrisafov, an extra constructor should be no problem for a 4.3 backport, but we had this issue which I don't believe was fixed in the 2.0.x line. Have you been able to confirm if the issue affects your experiment? If it does, as I suspect is the case, then we can only work on support for this in 5.x.

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

I've created Reactor Netty #302 against the current Reactor Netty project.

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

Filip Hrisafov based discussion under Reactor Netty #302, I see now that the original issue (based on Reactor Net 2.0.x) was related to the Reconnect strategy but you have found a different way by passing a Supplier<InteSocketAddress> at construction time. Feel free to submit a patch based on your suggestion.

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

I'm scheduling for 5.0.5 and 4.3.15 since it sounds like this should work. We just need to confirm it works as expected, and possibly provide extra constructor options to make it a little easier.

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

There is now a new Reactor2TcpClient constructor (in 4.3.x) that accepts a Supplier<InetSocketAddress> along with a config option to plug in the client. There is an example in the docs.

Note that in master (5.x) the ReactorNettyTcpClient already has a constructor that takes a Consumer<ClientOptions.Builder<?>>, so all that needed to be done is expose the config option and update the docs.

@spring-projects-issues
Copy link
Collaborator Author

Filip Hrisafov commented

Thanks a lot for adding this Rossen. The documentation is amazing :). I know it would help a lot of people

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

No worries, and thanks for the tip!

@spring-projects-issues
Copy link
Collaborator Author

Petr Kukrál commented

Thanks a lot too. This will help us with proper load balancing of the clients :)

@spring-projects-issues
Copy link
Collaborator Author

luisalves00 commented

Hi,

I have something like this:

@Override
public void configureMessageBroker(final MessageBrokerRegistry config) {
// Artemis -> tcp://0.0.0.0:61613
config.enableStompBrokerRelay("/topic").setTcpClient(createTcpClient());
config.setApplicationDestinationPrefixes("/app");
}

private ReactorNettyTcpClient<byte[]> createTcpClient() {
	final List<InetSocketAddress> addressList = new ArrayList<>();
	addressList.add(new InetSocketAddress("192.168.0.1", 61613));
	addressList.add(new InetSocketAddress("192.168.0.2", 61613));
	addressList.add(new InetSocketAddress("192.168.0.3", 61613));
	addressList.add(new InetSocketAddress("192.168.0.4", 61613));
	final RoundRobinList<InetSocketAddress> addresses = new RoundRobinList<>(addressList);
	final Consumer<ClientOptions.Builder<?>> builderConsumer = b -> {
		b.connectAddress(() -> { return addresses.getNext(); });
	};
	return new ReactorNettyTcpClient<byte[]>(builderConsumer, new StompReactorNettyCodec()); 
}

But I think "192.168.0.1" will take all the load until is down.
Then Message Broker Relay will move to the next IP.
Am I wrong?

@spring-projects-issues
Copy link
Collaborator Author

luisalves00 commented

I was wrong. The round robin works fine. Each new consumer will be connected to the next address.

@spring-projects-issues
Copy link
Collaborator Author

Alan So commented

I find that the version 5.1 has removed public the constructor "ReactorNettyTcpClient(Consumer<ClientOptions.Builder<?>> optionsConsumer,public ReactorNettyTcpClient(Consumer<ClientOptions.Builder<?>> optionsConsumer, ReactorNettyCodec<P> codec)". Could you please help to merge it into 5.1?

@spring-projects-issues
Copy link
Collaborator Author

spring-projects-issues commented Nov 23, 2018

Rossen Stoyanchev commented

Infinity821 see #22055.

@PaulGobin
Copy link

PaulGobin commented Oct 29, 2020

Hey guys @spring-issuemaster @rstoyanchev I realize that this is an old post, however, I was wondering how do I use this code? It seem like "ClientOptions" was removed? I am using SpringBoot 2.3.4 and would like to use a list of clustered broker addresses.

Any help will be much appreciated.

private ReactorNettyTcpClient<byte[]> createTcpClient() {
	final List<InetSocketAddress> addressList = new ArrayList<>();
	addressList.add(new InetSocketAddress("192.168.0.1", 61613));
	addressList.add(new InetSocketAddress("192.168.0.2", 61613));
	addressList.add(new InetSocketAddress("192.168.0.3", 61613));
	addressList.add(new InetSocketAddress("192.168.0.4", 61613));
	final RoundRobinList<InetSocketAddress> addresses = new RoundRobinList<>(addressList);
	final Consumer<**ClientOptions**.Builder<?>> builderConsumer = b -> {
		b.connectAddress(() -> { return addresses.getNext(); });
	};
	return new ReactorNettyTcpClient<byte[]>(builderConsumer, new StompReactorNettyCodec()); 
}

Thanks in advance.

@kmandalas
Copy link

kmandalas commented Nov 26, 2020

@PaulGobin I suppose what is mentioned here: https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#websocket-stomp-handle-broker-relay-configure does not cover you.

I also have similar situation but with the following difference (I think): in my case I have an ArtemisMQ 2-node Cluster in active/passive mode.

@spring-issuemaster please advise, there is lack of documentation/examples IMO for such cases. I do not think such should be subject to StackOverflow questions so I took the liberty to open new issue: #26169

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) status: backported An issue that has been backported to maintenance branches type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

4 participants