Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] MQTT RMS consumer: instead of deleting/creating, create a u… #5017

Merged
merged 3 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 20 additions & 14 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,26 +1439,32 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
}
}

// Delete the old (legacy) consumer, from v2.10.10 and before.
rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id
if _, err := jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName); isErrorOtherThan(err, JSConsumerNotFoundErr) {
levb marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

// Using ephemeral consumer is too risky because if this server were to be
// disconnected from the rest for few seconds, then the leader would remove
// the consumer, so even after a reconnect, we would no longer receive
// retained messages. Delete any existing durable that we have for that
// and recreate here.
// The name for the durable is $MQTT_rmsgs_<server name hash> (which is jsa.id)
rmDurName := mqttRetainedMsgsStreamName + "_" + jsa.id
// If error other than "not found" then fail, otherwise proceed with creating
// the durable consumer.
if _, err := jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmDurName); isErrorOtherThan(err, JSConsumerNotFoundErr) {
return nil, err
}
// retained messages.
//
// So we use a durable consumer, and create a new one each time we start.
// The old one should expire and get deleted due to inactivity. The name for
// the durable is $MQTT_rmsgs_{uuid}_{server-name}, the server name is just
// for readability.
rmDurName := mqttRetainedMsgsStreamName + "_" + nuid.Next() + "_" + s.String()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really durable, just the name, and we only really expect it to live for the lifetime of the running server.


ccfg := &CreateConsumerRequest{
Stream: mqttRetainedMsgsStreamName,
Config: ConsumerConfig{
Durable: rmDurName,
FilterSubject: mqttRetainedMsgsStreamSubject + ">",
DeliverSubject: rmsubj,
ReplayPolicy: ReplayInstant,
AckPolicy: AckNone,
Durable: rmDurName,
FilterSubject: mqttRetainedMsgsStreamSubject + ">",
DeliverSubject: rmsubj,
ReplayPolicy: ReplayInstant,
AckPolicy: AckNone,
InactiveThreshold: 5 * time.Minute,
},
}
if _, err := jsa.createConsumer(ccfg); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3026,8 +3026,10 @@ func TestMQTTCluster(t *testing.T) {
for _, sn := range streams {
cl.waitOnStreamLeader(globalAccountName, sn)
}
cl.waitOnConsumerLeader(globalAccountName, mqttRetainedMsgsStreamName, "$MQTT_rmsgs_esFhDys3")
cl.waitOnConsumerLeader(globalAccountName, mqttRetainedMsgsStreamName, "$MQTT_rmsgs_z3WIzPtj")
// <>/<> TODO: need to find a way to wait for the consumer
levb marked this conversation as resolved.
Show resolved Hide resolved
// leader, but the names are unique and not predictable.
// cl.waitOnConsumerLeader(globalAccountName, mqttRetainedMsgsStreamName, "$MQTT_rmsgs_esFhDys3")
// cl.waitOnConsumerLeader(globalAccountName, mqttRetainedMsgsStreamName, "$MQTT_rmsgs_z3WIzPtj")
}
})
}
Expand Down