Skip to content

Commit

Permalink
[Messenger][Amqp] Handle AMQPConnectionException when publishing a me…
Browse files Browse the repository at this point in the history
…ssage.
  • Loading branch information
jwage committed Mar 5, 2024
1 parent 182e93e commit 0ea9ca5
Showing 1 changed file with 44 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,15 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, ?
return;
}

$this->publishOnExchange(
$this->exchange(),
$body,
$this->getRoutingKeyForMessage($amqpStamp),
$headers,
$amqpStamp
);
$this->withConnectionExceptionRetry(function() use ($body, $amqpStamp, $headers) {
$this->publishOnExchange(
$this->exchange(),
$body,
$this->getRoutingKeyForMessage($amqpStamp),
$headers,
$amqpStamp
);
});
}

/**
Expand All @@ -320,13 +322,15 @@ private function publishWithDelay(string $body, array $headers, int $delay, ?Amq

$this->setupDelay($delay, $routingKey, $isRetryAttempt);

$this->publishOnExchange(
$this->getDelayExchange(),
$body,
$this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt),
$headers,
$amqpStamp
);
$this->withConnectionExceptionRetry(function() use ($body, $delay, $routingKey, $isRetryAttempt, $headers, $amqpStamp) {
$this->publishOnExchange(
$this->getDelayExchange(),
$body,
$this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt),
$headers,
$amqpStamp
);
});
}

private function publishOnExchange(\AMQPExchange $exchange, string $body, ?string $routingKey = null, array $headers = [], ?AmqpStamp $amqpStamp = null): void
Expand Down Expand Up @@ -545,11 +549,16 @@ public function exchange(): \AMQPExchange
private function clearWhenDisconnected(): void
{
if (!$this->channel()->isConnected()) {
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
$this->amqpQueues = [];
$this->clear();
}
}

private function clear(): void
{
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
$this->amqpQueues = [];
}

private function getDefaultPublishRoutingKey(): ?string
{
return $this->exchangeOptions['default_publish_routing_key'] ?? null;
Expand All @@ -566,4 +575,23 @@ private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
{
return $amqpStamp?->getRoutingKey() ?? $this->getDefaultPublishRoutingKey();
}

private function withConnectionExceptionRetry(callable $callable): void
{
$maxRetries = 3;
$retries = 0;

retry:
try {
$callable();
} catch (\AMQPConnectionException $e) {
if (++$retries <= $maxRetries) {
$this->clear();

goto retry;
}

throw $e;
}
}
}

0 comments on commit 0ea9ca5

Please sign in to comment.