Skip to content

Commit

Permalink
bug #54167 [Messenger] [Amqp] Handle AMQPConnectionException when pub…
Browse files Browse the repository at this point in the history
…lishing a message. (jwage)

This PR was squashed before being merged into the 6.4 branch.

Discussion
----------

[Messenger] [Amqp] Handle AMQPConnectionException when publishing a message.

| Q             | A
| ------------- | ---
| Branch?       | 6.4
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| Issues        | Fix #36538 Fix #48241
| License       | MIT

If you have a message handler that dispatches messages to another queue, you can encounter `AMQPConnectionException` with the message "Library error: a SSL error occurred" or "a socket error occurred"  depending on if you are using tls or not or if you are running behind a load balancer or not.

You can manually reproduce this issue by dispatching a message where the handler then dispatches another message to a different queue, then go to rabbitmq admin and close the connection manually, then dispatch another message and when the message handler goes to dispatch the other message, you will get this exception:

```
a socket error occurred
#0 /vagrant/vendor/symfony/amqp-messenger/Transport/AmqpTransport.php(60): Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender->send()
#1 /vagrant/vendor/symfony/messenger/Middleware/SendMessageMiddleware.php(62): Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransport->send()
#2 /vagrant/vendor/symfony/messenger/Middleware/FailedMessageProcessingMiddleware.php(34): Symfony\Component\Messenger\Middleware\SendMessageMiddleware->handle()
#3 /vagrant/vendor/symfony/messenger/Middleware/DispatchAfterCurrentBusMiddleware.php(61): Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware->handle()
#4 /vagrant/vendor/symfony/messenger/Middleware/RejectRedeliveredMessageMiddleware.php(41): Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware->handle()
#5 /vagrant/vendor/symfony/messenger/Middleware/AddBusNameStampMiddleware.php(37): Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware->handle()
#6 /vagrant/vendor/symfony/messenger/Middleware/TraceableMiddleware.php(40): Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware->handle()
#7 /vagrant/vendor/symfony/messenger/MessageBus.php(70): Symfony\Component\Messenger\Middleware\TraceableMiddleware->handle()
#8 /vagrant/vendor/symfony/messenger/TraceableMessageBus.php(38): Symfony\Component\Messenger\MessageBus->dispatch()
#9 /vagrant/src/Messenger/MessageBus.php(37): Symfony\Component\Messenger\TraceableMessageBus->dispatch()
#10 /vagrant/vendor/symfony/mailer/Mailer.php(66): App\Messenger\MessageBus->dispatch()
#11 /vagrant/src/Mailer/Mailer.php(83): Symfony\Component\Mailer\Mailer->send()
#12 /vagrant/src/Mailer/Mailer.php(96): App\Mailer\Mailer->send()
#13 /vagrant/src/MessageHandler/Trading/StrategySubscriptionMessageHandler.php(118): App\Mailer\Mailer->sendEmail()
#14 /vagrant/src/MessageHandler/Trading/StrategySubscriptionMessageHandler.php(72): App\MessageHandler\Trading\StrategySubscriptionMessageHandler->handle()
#15 /vagrant/vendor/symfony/messenger/Middleware/HandleMessageMiddleware.php(152): App\MessageHandler\Trading\StrategySubscriptionMessageHandler->__invoke()
#16 /vagrant/vendor/symfony/messenger/Middleware/HandleMessageMiddleware.php(91): Symfony\Component\Messenger\Middleware\HandleMessageMiddleware->callHandler()
#17 /vagrant/vendor/symfony/messenger/Middleware/SendMessageMiddleware.php(71): Symfony\Component\Messenger\Middleware\HandleMessageMiddleware->handle()
#18 /vagrant/vendor/symfony/messenger/Middleware/FailedMessageProcessingMiddleware.php(34): Symfony\Component\Messenger\Middleware\SendMessageMiddleware->handle()
#19 /vagrant/vendor/symfony/messenger/Middleware/DispatchAfterCurrentBusMiddleware.php(68): Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware->handle()
#20 /vagrant/vendor/symfony/messenger/Middleware/RejectRedeliveredMessageMiddleware.php(41): Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware->handle()
#21 /vagrant/vendor/symfony/messenger/Middleware/AddBusNameStampMiddleware.php(37): Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware->handle()
#22 /vagrant/vendor/symfony/messenger/Middleware/TraceableMiddleware.php(40): Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware->handle()
#23 /vagrant/vendor/symfony/messenger/MessageBus.php(70): Symfony\Component\Messenger\Middleware\TraceableMiddleware->handle()
#24 /vagrant/vendor/symfony/messenger/TraceableMessageBus.php(38): Symfony\Component\Messenger\MessageBus->dispatch()
#25 /vagrant/vendor/symfony/messenger/RoutableMessageBus.php(54): Symfony\Component\Messenger\TraceableMessageBus->dispatch()
#26 /vagrant/vendor/symfony/messenger/Worker.php(162): Symfony\Component\Messenger\RoutableMessageBus->dispatch()
#27 /vagrant/vendor/symfony/messenger/Worker.php(109): Symfony\Component\Messenger\Worker->handleMessage()
#28 /vagrant/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php(238): Symfony\Component\Messenger\Worker->run()
#29 /vagrant/vendor/symfony/console/Command/Command.php(326): Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute()
#30 /vagrant/vendor/symfony/console/Application.php(1096): Symfony\Component\Console\Command\Command->run()
#31 /vagrant/vendor/symfony/framework-bundle/Console/Application.php(126): Symfony\Component\Console\Application->doRunCommand()
#32 /vagrant/vendor/symfony/console/Application.php(324): Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand()
#33 /vagrant/vendor/symfony/framework-bundle/Console/Application.php(80): Symfony\Component\Console\Application->doRun()
#34 /vagrant/vendor/symfony/console/Application.php(175): Symfony\Bundle\FrameworkBundle\Console\Application->doRun()
#35 /vagrant/vendor/symfony/runtime/Runner/Symfony/ConsoleApplicationRunner.php(49): Symfony\Component\Console\Application->run()
#36 /vagrant/vendor/autoload_runtime.php(29): Symfony\Component\Runtime\Runner\Symfony\ConsoleApplicationRunner->run()
#37 /vagrant/bin/console(11): require_once('...')
#38 {main}
```

TODO:

- [x] Add test for retry logic when publishing messages

Commits
-------

f123370 [Messenger] [Amqp] Handle AMQPConnectionException when publishing a message.
  • Loading branch information
chalasr committed Mar 5, 2024
2 parents 0523300 + f123370 commit e43b198
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,73 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn()
);
}

public function testItCanRetryPublishWhenAMQPConnectionExceptionIsThrown()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->expects($this->exactly(2))
->method('publish')
->willReturnOnConsecutiveCalls(
$this->throwException(new \AMQPConnectionException('a socket error occurred')),
null
);

$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body');
}

public function testItCanRetryPublishWithDelayWhenAMQPConnectionExceptionIsThrown()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->expects($this->exactly(2))
->method('publish')
->willReturnOnConsecutiveCalls(
$this->throwException(new \AMQPConnectionException('a socket error occurred')),
null
);

$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body', [], 5000);
}

public function testItWillRetryMaxThreeTimesWhenAMQPConnectionExceptionIsThrown()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$exception = new \AMQPConnectionException('a socket error occurred');

$amqpExchange->expects($this->exactly(4))
->method('publish')
->willReturnOnConsecutiveCalls(
$this->throwException($exception),
$this->throwException($exception),
$this->throwException($exception),
$this->throwException($exception),
);

self::expectException($exception::class);
self::expectExceptionMessage($exception->getMessage());

$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body');
}

private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,19 +287,21 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, ?
$this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
}

if (0 !== $delayInMs) {
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
$this->withConnectionExceptionRetry(function () use ($body, $headers, $delayInMs, $amqpStamp) {
if (0 !== $delayInMs) {
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);

return;
}
return;
}

$this->publishOnExchange(
$this->exchange(),
$body,
$this->getRoutingKeyForMessage($amqpStamp),
$headers,
$amqpStamp
);
$this->publishOnExchange(
$this->exchange(),
$body,
$this->getRoutingKeyForMessage($amqpStamp),
$headers,
$amqpStamp
);
});
}

/**
Expand Down Expand Up @@ -545,11 +547,16 @@ public function exchange(): \AMQPExchange
private function clearWhenDisconnected(): void
{
if (!$this->channel()->isConnected()) {
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
$this->amqpQueues = [];
$this->clear();
}
}

private function clear(): void
{
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
$this->amqpQueues = [];
}

private function getDefaultPublishRoutingKey(): ?string
{
return $this->exchangeOptions['default_publish_routing_key'] ?? null;
Expand All @@ -566,4 +573,23 @@ private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
{
return $amqpStamp?->getRoutingKey() ?? $this->getDefaultPublishRoutingKey();
}

private function withConnectionExceptionRetry(callable $callable): void
{
$maxRetries = 3;
$retries = 0;

retry:
try {
$callable();
} catch (\AMQPConnectionException $e) {
if (++$retries <= $maxRetries) {
$this->clear();

goto retry;
}

throw $e;
}
}
}

0 comments on commit e43b198

Please sign in to comment.