Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: symfony/messenger
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v7.1.9
Choose a base ref
...
head repository: symfony/messenger
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v7.2.0
Choose a head ref

Commits on Jun 20, 2024

  1. Prefix all sprintf() calls

    derrabus committed Jun 20, 2024
    Copy the full SHA
    76a2cdf View commit details

Commits on Jun 24, 2024

  1. chore: CS fixes

    keradus authored and nicolas-grekas committed Jun 24, 2024
    Copy the full SHA
    99c25b2 View commit details

Commits on Jun 25, 2024

  1. Copy the full SHA
    77ca40f View commit details
  2. feature #57426 [Messenger] Add --format option to the `messenger:st…

    …ats` command (xvilo)
    
    This PR was squashed before being merged into the 7.2 branch.
    
    Discussion
    ----------
    
    [Messenger] Add `--format` option to the `messenger:stats` command
    
    | Q             | A
    | ------------- | ---
    | Branch?       | 7.2
    | Bug fix?      | no
    | New feature?  | yes <!-- please update src/**/CHANGELOG.md files -->
    | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
    | Issues        | Fix #48583 <!-- prefix each issue number with "Fix #", no need to create an issue if none exists, explain below instead -->
    | License       | MIT
    
    As requested in #48583 add a way to output different in formats for `messenger:stats` command. This can be more easily used in, for example, `jq` or with external automations/scripts and such.
    
    Considerations I made:
    - To not, yet, make different classes for the output. In case a new output format is added this might be handy.
    - To not use an enum for output format, do we want this?
    - To ignore warnings for now, except for the `uncountable_transports`. If we want to warning in there, what format?
    
    Commits
    -------
    
    0e9f4588ec [Messenger] Add `--format` option to the `messenger:stats` command
    fabpot committed Jun 25, 2024
    Copy the full SHA
    a95a633 View commit details
  3. Copy the full SHA
    0e2bae3 View commit details

Commits on Jun 26, 2024

  1. Copy the full SHA
    f86150c View commit details

Commits on Jun 28, 2024

  1. feature #57507 [Messenger] Introduce #[AsMessage] attribute for mes…

    …sage routing (pounard)
    
    This PR was merged into the 7.2 branch.
    
    Discussion
    ----------
    
    [Messenger] Introduce `#[AsMessage]` attribute for message routing
    
    | Q             | A
    | ------------- | ---
    | Branch?       | 7.2
    | Bug fix?      | no
    | New feature?  | yes
    | Deprecations? | no
    | Issues        | Fix #57506
    | License       | MIT
    
    Basic implementation of #57506.
    
    * Adds the `Symfony\Component\Messenger\Attribute\AsMessage` attribute, with the `$transport` parameter which can be `string` or `array`.
    * Implement runtime routing in `Symfony\Component\Messenger\Transport\Sender\SendersLocator`.
    
    Rationale:
    
    * Messages are not services, it cannot be computed during container compilation.
    * Reflection is very fast, it shouldn't be significant for performances, yet I don't have measured it yet.
    * YAML configuration and `Symfony\Component\Messenger\Stamp\TransportNamesStamp` will always override the attribute values, allowing users to change hardcoded routing on a per-environment basis.
    * This is the simplest implementation I could think of for discussion.
    
    Links and references:
    
    * symfony/symfony#33912 where the discussion started, 5 years ago.
    * symfony/symfony#49143 closed PR that was doing the same, but at compile time, rejected because the actual doctrine is that messages should never be services.
    * symfony/symfony#41179 is stilled opened, and awaiting for modifications, but it was written for an earlier version of Symfony and is inactive for a year or so, yet messenger code has evolved since, and this PR will never merge as-is, it requires to be fully rewrote, reason why I opened this new one.
    
    Commits
    -------
    
    d65284239f feature #57506 [Messenger] introduce AsMessage attribute for message routing
    nicolas-grekas committed Jun 28, 2024
    Copy the full SHA
    7d6afa4 View commit details
  2. Merge branch '7.1' into 7.2

    * 7.1: (21 commits)
      [Serializer] [ObjectNormalizer] Use bool filter when FILTER_BOOL is set
      [HttpClient][Mailer] Revert "Let curl handle transfer encoding", use HTTP/1.1 for Mailgun
      Reviewed Catalan missing translations
      [AssetMapper] Upgrade importmap polyfill
      [HttpClient] Fix initializing InformationalChunk
      Fix typo: synchronous -> synchronously
      forward exceptions caught in the AbstractObjectNormalizer
      [Serializer] Check if exception message in test is correct
      [Serializer] Check if exception message in test is correct
      take the new DOM HTMLElement class into account
      Ibexa is sponsoring Symfony 5.4, thanks to them! \o/
      [FrameworkBundle] Fix warming up routes
      [VarDumper] Fix `FFICaster` test to be platform-adaptable
      [String] Add `alias` case to `EnglishInflector`
      [FrameworkBundle] Throw runtime exception when trying to use asset-mapper while http-client is disabled
      [SecurityBundle] Remove unused memory users’ `name` attribute from the XSD
      [VarExporter] generate __doUnserialize() method in ProxyHelper::generateLazyProxy()
      Double check if pcntl function exists
      Add additional headers in Scaleway bridge
      [VarDumper] Fix FFI caster test
      ...
    nicolas-grekas committed Jun 28, 2024
    Copy the full SHA
    9b88a6a View commit details

Commits on Jul 6, 2024

  1. Update .gitattributes

    fabpot committed Jul 6, 2024
    Copy the full SHA
    918489d View commit details
  2. Copy the full SHA
    af1d241 View commit details

Commits on Jul 9, 2024

  1. Merge branch '7.1' into 7.2

    * 7.1:
      fix merge
      [AssetMapper] Split test dirs in tests
      Fix typo
      Change incorrect check for the `stateless` request attribute
      [Validator] add setGroupProvider to AttributeLoader
      use copy() instead of rename() on Windows
      test: kebab-case to snake_case
      [PropertyInfo] Handle collection in PhpStan same as PhpDoc
      [Messenger] Passing to `WorkerMessageRetriedEvent` envelope with actual stamps after sent
    xabbuh committed Jul 9, 2024
    Copy the full SHA
    641b350 View commit details

Commits on Jul 17, 2024

  1. Use createMock

    OskarStark authored and xabbuh committed Jul 17, 2024
    Copy the full SHA
    5183786 View commit details

Commits on Jul 23, 2024

  1. Copy the full SHA
    4d00525 View commit details

Commits on Aug 2, 2024

  1. Copy the full SHA
    3ddfefd View commit details

Commits on Aug 5, 2024

  1. Copy the full SHA
    f342bd0 View commit details

Commits on Aug 6, 2024

  1. Code style change in @PER-CS2.0 affecting @Symfony (parentheses f…

    …or anonymous classes)
    bonroyage authored and derrabus committed Aug 6, 2024
    Copy the full SHA
    6940f83 View commit details
  2. minor #57901 Code style change in `@PER-CS2.0 affecting `@Symfony…

    …`` (parentheses for anonymous classes) (bonroyage)
    
    This PR was squashed before being merged into the 7.2 branch.
    
    Discussion
    ----------
    
    Code style change in ``@PER`-CS2.0` affecting ``@Symfony`` (parentheses for anonymous classes)
    
    | Q             | A
    | ------------- | ---
    | Branch?       | 7.2
    | Bug fix?      | no
    | New feature?  | no
    | Deprecations? | no
    | Issues        | -
    | License       | MIT
    
    I have created a PR (PHP-CS-Fixer/PHP-CS-Fixer#8140) in the PHP-CS-Fixer repo to bring the ``@PER`-CS2.0` ruleset in line with the specifications on the `new_with_parentheses` rule for anonymous classes. Since the ``@Symfony`` ruleset builds upon the ``@PER`-CS2.0` ruleset, they would like confirmation that the Symfony community is OK with this change affecting the Symfony ruleset as well. Should it not be, I'll push another commit there ensuring that the change does not affect ``@Symfony``.
    
    Therefore, this PR is not meant to be merged, but function as an RFC to get your opinion and show the effect it would have when applied to the Symfony source.
    
    Commits
    -------
    
    506e0dd327 Code style change in ``@PER`-CS2.0` affecting ``@Symfony`` (parentheses for anonymous classes)
    derrabus committed Aug 6, 2024
    Copy the full SHA
    a58a667 View commit details

Commits on Aug 12, 2024

  1. Merge branch '7.1' into 7.2

    * 7.1: (31 commits)
      [Serializer] Remove useless calls to `func_get_arg()`
      fix tests using Twig 3.12
      skip tests requiring the intl extension if it's not installed
      🐛 throw ParseException on invalid date
      [FrameworkBundle] Re-remove redundant name attribute from `default_context`
      fix permitted data type of the default choice
      [ExpressionLanguage] Improve test coverage
      Fix invalid phpdoc in ContainerBuilder
      [HttpKernel] [WebProfileBundle] Fix Routing panel for URLs with a colon
      [Form] NumberType: Fix parsing of numbers in exponential notation with negative exponent
      Fix importing PHP config in prepend extension method
      [Messenger] Prevent waiting time to overflow when using long delays
      [Security] consistent singular/plural translation in Dutch
      reset the validation context after validating nested constraints
      do not duplicate directory separators
      fix handling empty data in ValueToDuplicatesTransformer
      fix compatibility with redis extension 6.0.3+
      synchronize unsupported scheme tests
      [String] Fixed Quorum plural, that was inflected to be only "Quora" and never "Quorums"
      Fix symfony/kaz-info-teh-notifier package
      ...
    xabbuh committed Aug 12, 2024
    Copy the full SHA
    cff427c View commit details

Commits on Aug 13, 2024

  1. Copy the full SHA
    15d5e7b View commit details
  2. feature #57915 [Messenger] Allow setting retry delay by RecoverableEx…

    …ceptionInterface (valtzu)
    
    This PR was merged into the 7.2 branch.
    
    Discussion
    ----------
    
    [Messenger] Allow setting retry delay by RecoverableExceptionInterface
    
    | Q             | A
    | ------------- | ---
    | Branch?       | 7.2
    | Bug fix?      | no
    | New feature?  | yes
    | Deprecations? | no
    | Issues        | Fix #57756
    | License       | MIT
    
    Allow overriding retry delay from the retry strategy by providing it in the exception. Example use case is retrying http request based on `Retry-After` header.
    
    Commits
    -------
    
    68a096cdb2 Allow setting retry delay by RecoverableExceptionInterface
    nicolas-grekas committed Aug 13, 2024
    Copy the full SHA
    1856245 View commit details

Commits on Aug 19, 2024

  1. Add previous to the exception output

    ToshY authored and fabpot committed Aug 19, 2024
    Copy the full SHA
    390d685 View commit details

Commits on Aug 22, 2024

  1. Copy the full SHA
    b24a1e0 View commit details

Commits on Aug 26, 2024

  1. Copy the full SHA
    3e34b41 View commit details

Commits on Sep 3, 2024

  1. [Messenger] Allow to skip message in FailedMessagesRetryCommand

    Update CHANGELOG message and help message
    Thibaut Chieux committed Sep 3, 2024
    Copy the full SHA
    f1711be View commit details

Commits on Sep 8, 2024

  1. Merge branch '7.1' into 7.2

    * 7.1:
      Mitigate PHPUnit deprecations
      [TwigBundle] Add support for resetting globals between HTTP requests
      [Process] Fix backwards compatibility for invalid commands
      Mitigate PHPUnit deprecations
      [Cache] Fix compatibility with Redis 6.1.0 pre-releases
      [Validator] Add Catalan and Spanish translation for `Week` constraint
      Don't use is_resource() on non-streams
      [Ldap] Fix extension deprecation
    derrabus committed Sep 8, 2024
    Copy the full SHA
    9571433 View commit details

Commits on Sep 19, 2024

  1. Copy the full SHA
    e093da8 View commit details

Commits on Sep 25, 2024

  1. Merge branch '7.1' into 7.2

    * 7.1:
      Add PR template and auto-close PR on subtree split repositories
    nicolas-grekas committed Sep 25, 2024
    Copy the full SHA
    03375bd View commit details
  2. Copy the full SHA
    dd6fd20 View commit details
  3. relax mock class name matching

    Instead of performing some fuzzy matching on the automatically generated mock
    name (the pattern varies between different PHPUnit versions) we can simply
    use the actual exact class name.
    xabbuh committed Sep 25, 2024
    Copy the full SHA
    1ed2911 View commit details

Commits on Sep 26, 2024

  1. minor #58391 relax mock class name matching (xabbuh)

    This PR was merged into the 7.2 branch.
    
    Discussion
    ----------
    
    relax mock class name matching
    
    | Q             | A
    | ------------- | ---
    | Branch?       | 7.2
    | Bug fix?      | no
    | New feature?  | no
    | Deprecations? | no
    | Issues        |
    | License       | MIT
    
    Instead of performing some fuzzy matching on the automatically generated mock name (the pattern varies between different PHPUnit versions) we can simply use the actual exact class name.
    
    Commits
    -------
    
    2c99e009d09 relax mock class name matching
    nicolas-grekas committed Sep 26, 2024
    Copy the full SHA
    d5608ca View commit details
  2. Copy the full SHA
    edfc2a8 View commit details

Commits on Sep 27, 2024

  1. minor #58303 Miscellaneous tests improvements (alexandre-daubois)

    This PR was merged into the 7.2 branch.
    
    Discussion
    ----------
    
    Miscellaneous tests improvements
    
    | Q             | A
    | ------------- | ---
    | Branch?       | 7.2
    | Bug fix?      | no
    | New feature?  | no
    | Deprecations? | no
    | Issues        | -
    | License       | MIT
    
    Mainly missed occasions to use `assertCount()` and wrong arguments placement between expected and actual results.
    
    Also for `AbstractDivLayoutTestCase` the abstract class references data provider only defined in the subclass. I think it's a good idea to put the data providers next to the test methods instead.
    
    Commits
    -------
    
    94c43b4fbe Miscellaneous tests improvements
    fabpot committed Sep 27, 2024
    Copy the full SHA
    5136af0 View commit details
  2. feature #57270 [Messenger] Allow to skip message in `FailedMessagesRe…

    …tryCommand` (Thibaut Chieux)
    
    This PR was merged into the 7.2 branch.
    
    Discussion
    ----------
    
    [Messenger] Allow to skip message in `FailedMessagesRetryCommand`
    
    | Q             | A
    | ------------- | ---
    | Branch?       | 7.2
    | Bug fix?      | no
    | New feature?  | yes
    | Deprecations? | no
    | Issues        | Fix #50936
    | License       | MIT
    
    When retrying message with the command messenger:failed:retry, we have two options:
    - Retry : (It will be consumed and re-added to failed queue if it re-fail)
    - Delete
    
    But sometimes we have no clue if we can retry it or not but I now that we don't want to delete it yet. In that case we need to stop the command and go id by id. The interactive command is lost. The command should provide a way to skip a message to continue with the rest of the failed message.
    
    Commits
    -------
    
    7ec6914750 [Messenger] Allow to skip message in FailedMessagesRetryCommand
    fabpot committed Sep 27, 2024
    Copy the full SHA
    0f8bb86 View commit details

Commits on Oct 6, 2024

  1. [Messenger] Notify transports which messages are still being processe…

    …d, using `pcntl_alarm()`
    HypeMC committed Oct 6, 2024
    Copy the full SHA
    4bf5868 View commit details

Commits on Oct 13, 2024

  1. Copy the full SHA
    8137d56 View commit details

Commits on Oct 14, 2024

  1. Copy the full SHA
    b61b332 View commit details

Commits on Oct 17, 2024

  1. Copy the full SHA
    c139768 View commit details

Commits on Oct 22, 2024

  1. bug #58594 [Messenger] Check for #[AsMessage] attributes on parents…

    … (HypeMC)
    
    This PR was merged into the 7.2 branch.
    
    Discussion
    ----------
    
    [Messenger] Check for `#[AsMessage]` attributes on parents
    
    | Q             | A
    | ------------- | ---
    | Branch?       | 7.2
    | Bug fix?      | no
    | New feature?  | no
    | Deprecations? | no
    | Issues        | -
    | License       | MIT
    
    I've been testing the `#[AsMessage]` attribute and noticed that the attribute is not being read from parent classes or interfaces. This is different to how the `routing` option works, so currently, attributes can't fully replace the option.
    
    Commits
    -------
    
    a164474fe36 [Messenger] Check for `#[AsMessage]` attributes on parents
    nicolas-grekas committed Oct 22, 2024
    Copy the full SHA
    ed10b36 View commit details

Commits on Nov 6, 2024

  1. Copy the full SHA
    3722a28 View commit details

Commits on Nov 9, 2024

  1. Merge branch '7.1' into 7.2

    * 7.1:
      [AssetMapper] Fix `JavaScriptImportPathCompiler` regex for non-latin characters
      Definition::$class may not be class-string
      require Cache component versions compatible with Redis 6.1
      [Twitter][Notifier] Fix post INIT upload
      [Messenger][RateLimiter] fix additional message handled when using a rate limiter
      [Serializer] Revert default groups
      [Serializer] fixed object normalizer for a class with `cancel` method
      Fix bucket size reduce when previously created with bigger size
    xabbuh committed Nov 9, 2024
    Copy the full SHA
    ca6f254 View commit details

Commits on Nov 25, 2024

  1. Copy the full SHA
    e80e728 View commit details

Commits on Nov 26, 2024

  1. Merge branch '7.1' into 7.2

    * 7.1:
      fix test
      [Messenger] fix `Envelope::all()` conditional return docblock
    xabbuh committed Nov 26, 2024
    Copy the full SHA
    2512b9b View commit details
Showing with 806 additions and 164 deletions.
  1. +29 −0 Attribute/AsMessage.php
  2. +11 −0 CHANGELOG.md
  3. +7 −8 Command/AbstractFailedMessagesCommand.php
  4. +25 −7 Command/ConsumeMessagesCommand.php
  5. +7 −7 Command/DebugCommand.php
  6. +6 −6 Command/FailedMessagesRemoveCommand.php
  7. +33 −8 Command/FailedMessagesRetryCommand.php
  8. +8 −8 Command/FailedMessagesShowCommand.php
  9. +4 −6 Command/SetupTransportsCommand.php
  10. +67 −3 Command/StatsCommand.php
  11. +17 −17 DependencyInjection/MessengerPass.php
  12. +4 −5 Envelope.php
  13. +21 −0 Event/WorkerMessageSkipEvent.php
  14. +7 −2 EventListener/SendFailedMessageForRetryListener.php
  15. +17 −0 EventListener/SendFailedMessageToFailureTransportListener.php
  16. +2 −2 Exception/DelayedMessageHandlingException.php
  17. +2 −2 Exception/HandlerFailedException.php
  18. +2 −0 Exception/RecoverableExceptionInterface.php
  19. +9 −0 Exception/RecoverableMessageHandlingException.php
  20. +1 −1 Exception/ValidationFailedException.php
  21. +7 −1 Exception/WrappedExceptionsInterface.php
  22. +6 −0 Exception/WrappedExceptionsTrait.php
  23. +4 −4 HandleTrait.php
  24. +2 −2 Handler/Acknowledger.php
  25. +1 −1 Message/RedispatchMessage.php
  26. +3 −4 MessageBus.php
  27. +4 −4 Middleware/DispatchAfterCurrentBusMiddleware.php
  28. +2 −2 Middleware/HandleMessageMiddleware.php
  29. +2 −2 Middleware/RouterContextMiddleware.php
  30. +1 −1 Middleware/SendMessageMiddleware.php
  31. +2 −2 Middleware/TraceableMiddleware.php
  32. +4 −4 Retry/MultiplierRetryStrategy.php
  33. +5 −8 RoutableMessageBus.php
  34. +37 −0 Tests/Command/FailedMessagesRetryCommandTest.php
  35. +44 −0 Tests/Command/StatsCommandTest.php
  36. +2 −2 Tests/DependencyInjection/MessengerPassTest.php
  37. +32 −0 Tests/EventListener/SendFailedMessageForRetryListenerTest.php
  38. +1 −1 Tests/EventListener/StopWorkerOnCustomStopExceptionListenerTest.php
  39. +1 −1 Tests/Exception/HandlerFailedExceptionTest.php
  40. +19 −0 Tests/Fixtures/DummyMessageInterfaceWithAttribute1.php
  41. +19 −0 Tests/Fixtures/DummyMessageInterfaceWithAttribute2.php
  42. +19 −0 Tests/Fixtures/DummyMessageWithAttribute.php
  43. +25 −0 Tests/Fixtures/DummyMessageWithInterfaceWithAttribute.php
  44. +19 −0 Tests/Fixtures/DummyMessageWithParentWithAttribute.php
  45. +1 −1 Tests/Handler/HandleDescriptorTest.php
  46. +10 −9 Tests/Middleware/DispatchAfterCurrentBusMiddlewareTest.php
  47. +7 −7 Tests/Middleware/HandleMessageMiddlewareTest.php
  48. +1 −1 Tests/Middleware/SendMessageMiddlewareTest.php
  49. +1 −1 Tests/Middleware/TraceableMiddlewareTest.php
  50. +1 −1 Tests/Stamp/TransportNamesStampTest.php
  51. +59 −0 Tests/Transport/Sender/SendersLocatorTest.php
  52. +0 −1 Tests/Transport/Serialization/SerializerTest.php
  53. +81 −2 Tests/WorkerTest.php
  54. +3 −4 TraceableMessageBus.php
  55. +1 −1 Transport/InMemory/InMemoryTransport.php
  56. +27 −0 Transport/Receiver/KeepaliveReceiverInterface.php
  57. +33 −1 Transport/Sender/SendersLocator.php
  58. +1 −3 Transport/Serialization/Normalizer/FlattenExceptionNormalizer.php
  59. +1 −1 Transport/Serialization/PhpSerializer.php
  60. +2 −2 Transport/Serialization/Serializer.php
  61. +30 −0 Worker.php
  62. +3 −5 WorkerMetadata.php
  63. +4 −3 composer.json
29 changes: 29 additions & 0 deletions Attribute/AsMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Attribute;

/**
* Attribute for configuring message routing.
*
* @author Pierre Rineau pierre.rineau@processus.org>
*/
#[\Attribute(\Attribute::TARGET_CLASS | \Attribute::IS_REPEATABLE)]
class AsMessage
{
public function __construct(
/**
* Name of the transports to which the message should be routed.
*/
public null|string|array $transport = null,
) {
}
}
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
CHANGELOG
=========

7.2
---

* Add `$previous` to the exception output at the `messenger:failed:show` command
* `WrappedExceptionsInterface` now extends PHP's `Throwable` interface
* Add `#[AsMessage]` attribute with `$transport` parameter for message routing
* Add `--format` option to the `messenger:stats` command
* Add `getRetryDelay()` method to `RecoverableExceptionInterface`
* Add `skip` option to `messenger:failed:retry` command when run interactively to skip message and requeue it
* Add the ability to asynchronously notify transports about which messages are still being processed by the worker, using `pcntl_alarm()`

7.1
---

15 changes: 7 additions & 8 deletions Command/AbstractFailedMessagesCommand.php
Original file line number Diff line number Diff line change
@@ -120,7 +120,7 @@ protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io): v
$redeliveryStamps = $envelope->all(RedeliveryStamp::class);
$io->writeln(' Message history:');
foreach ($redeliveryStamps as $redeliveryStamp) {
$io->writeln(sprintf(' * Message failed at <info>%s</info> and was redelivered', $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
$io->writeln(\sprintf(' * Message failed at <info>%s</info> and was redelivered', $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
}
$io->newLine();

@@ -148,19 +148,19 @@ protected function printPendingMessagesMessage(ReceiverInterface $receiver, Symf
if (1 === $receiver->getMessageCount()) {
$io->writeln('There is <comment>1</comment> message pending in the failure transport.');
} else {
$io->writeln(sprintf('There are <comment>%d</comment> messages pending in the failure transport.', $receiver->getMessageCount()));
$io->writeln(\sprintf('There are <comment>%d</comment> messages pending in the failure transport.', $receiver->getMessageCount()));
}
}
}

protected function getReceiver(?string $name = null): ReceiverInterface
{
if (null === $name ??= $this->globalFailureReceiverName) {
throw new InvalidArgumentException(sprintf('No default failure transport is defined. Available transports are: "%s".', implode('", "', array_keys($this->failureTransports->getProvidedServices()))));
throw new InvalidArgumentException(\sprintf('No default failure transport is defined. Available transports are: "%s".', implode('", "', array_keys($this->failureTransports->getProvidedServices()))));
}

if (!$this->failureTransports->has($name)) {
throw new InvalidArgumentException(sprintf('The "%s" failure transport was not found. Available transports are: "%s".', $name, implode('", "', array_keys($this->failureTransports->getProvidedServices()))));
throw new InvalidArgumentException(\sprintf('The "%s" failure transport was not found. Available transports are: "%s".', $name, implode('", "', array_keys($this->failureTransports->getProvidedServices()))));
}

return $this->failureTransports->get($name);
@@ -182,6 +182,7 @@ private function createCloner(): ?ClonerInterface
Caster::PREFIX_VIRTUAL.'file' => $flattenException->getFile(),
Caster::PREFIX_VIRTUAL.'line' => $flattenException->getLine(),
Caster::PREFIX_VIRTUAL.'trace' => new TraceStub($flattenException->getTrace()),
Caster::PREFIX_VIRTUAL.'previous' => $flattenException->getPrevious(),
];
}]);

@@ -194,9 +195,9 @@ protected function printWarningAvailableFailureTransports(SymfonyStyle $io, ?str
$failureTransportsCount = \count($failureTransports);
if ($failureTransportsCount > 1) {
$io->writeln([
sprintf('> Loading messages from the <comment>global</comment> failure transport <comment>%s</comment>.', $failureTransportName),
\sprintf('> Loading messages from the <comment>global</comment> failure transport <comment>%s</comment>.', $failureTransportName),
'> To use a different failure transport, pass <comment>--transport=</comment>.',
sprintf('> Available failure transports are: <comment>%s</comment>', implode(', ', $failureTransports)),
\sprintf('> Available failure transports are: <comment>%s</comment>', implode(', ', $failureTransports)),
"\n",
]);
}
@@ -233,8 +234,6 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti
$ids[] = $this->getMessageId($envelope);
}
$suggestions->suggestValues($ids);

return;
}
}
}
32 changes: 25 additions & 7 deletions Command/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
@@ -43,6 +43,8 @@
#[AsCommand(name: 'messenger:consume', description: 'Consume messages')]
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
{
private const DEFAULT_KEEPALIVE_INTERVAL = 5;

private ?Worker $worker = null;

public function __construct(
@@ -75,6 +77,7 @@ protected function configure(): void
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
])
->setHelp(<<<'EOF'
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -124,6 +127,13 @@ protected function configure(): void
;
}

protected function initialize(InputInterface $input, OutputInterface $output): void
{
if ($input->hasParameterOption('--keepalive')) {
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
}
}

protected function interact(InputInterface $input, OutputInterface $output): void
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
@@ -137,7 +147,7 @@ protected function interact(InputInterface $input, OutputInterface $output): voi

$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
if (\count($this->receiverNames) > 1) {
$io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
$io->writeln(\sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
}

$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
@@ -158,9 +168,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$receiverNames = $input->getOption('all') ? $this->receiverNames : $input->getArgument('receivers');
foreach ($receiverNames as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
$message = \sprintf('The receiver "%s" does not exist.', $receiverName);
if ($this->receiverNames) {
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
$message .= \sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
}

throw new RuntimeException($message);
@@ -187,7 +197,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$stopsWhen = [];
if (null !== $limit = $input->getOption('limit')) {
if (!is_numeric($limit) || 0 >= $limit) {
throw new InvalidOptionException(sprintf('Option "limit" must be a positive integer, "%s" passed.', $limit));
throw new InvalidOptionException(\sprintf('Option "limit" must be a positive integer, "%s" passed.', $limit));
}

$stopsWhen[] = "processed {$limit} messages";
@@ -206,7 +216,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

if (null !== $timeLimit = $input->getOption('time-limit')) {
if (!is_numeric($timeLimit) || 0 >= $timeLimit) {
throw new InvalidOptionException(sprintf('Option "time-limit" must be a positive integer, "%s" passed.', $timeLimit));
throw new InvalidOptionException(\sprintf('Option "time-limit" must be a positive integer, "%s" passed.', $timeLimit));
}

$stopsWhen[] = "been running for {$timeLimit}s";
@@ -216,7 +226,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';

$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 1 ? 's' : '', implode(', ', $receiverNames)));
$io->success(\sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 1 ? 's' : '', implode(', ', $receiverNames)));

if ($stopsWhen) {
$last = array_pop($stopsWhen);
@@ -264,7 +274,7 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti

public function getSubscribedSignals(): array
{
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT] : []);
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT, \SIGALRM] : []);
}

public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
@@ -273,6 +283,14 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
return false;
}

if (\SIGALRM === $signal) {
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);

$this->worker->keepalive($this->getApplication()->getAlarmInterval());

return false;
}

$this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]);

$this->worker->stop();
14 changes: 7 additions & 7 deletions Command/DebugCommand.php
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ public function __construct(
protected function configure(): void
{
$this
->addArgument('bus', InputArgument::OPTIONAL, sprintf('The bus id (one of "%s")', implode('", "', array_keys($this->mapping))))
->addArgument('bus', InputArgument::OPTIONAL, \sprintf('The bus id (one of "%s")', implode('", "', array_keys($this->mapping))))
->setHelp(<<<'EOF'
The <info>%command.name%</info> command displays all messages that can be
dispatched using the message buses:
@@ -62,7 +62,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$mapping = $this->mapping;
if ($bus = $input->getArgument('bus')) {
if (!isset($mapping[$bus])) {
throw new RuntimeException(sprintf('Bus "%s" does not exist. Known buses are "%s".', $bus, implode('", "', array_keys($this->mapping))));
throw new RuntimeException(\sprintf('Bus "%s" does not exist. Known buses are "%s".', $bus, implode('", "', array_keys($this->mapping))));
}
$mapping = [$bus => $mapping[$bus]];
}
@@ -73,16 +73,16 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$tableRows = [];
foreach ($handlersByMessage as $message => $handlers) {
if ($description = self::getClassDescription($message)) {
$tableRows[] = [sprintf('<comment>%s</>', $description)];
$tableRows[] = [\sprintf('<comment>%s</>', $description)];
}

$tableRows[] = [sprintf('<fg=cyan>%s</fg=cyan>', $message)];
$tableRows[] = [\sprintf('<fg=cyan>%s</fg=cyan>', $message)];
foreach ($handlers as $handler) {
$tableRows[] = [
sprintf(' handled by <info>%s</>', $handler[0]).$this->formatConditions($handler[1]),
\sprintf(' handled by <info>%s</>', $handler[0]).$this->formatConditions($handler[1]),
];
if ($handlerDescription = self::getClassDescription($handler[0])) {
$tableRows[] = [sprintf(' <comment>%s</>', $handlerDescription)];
$tableRows[] = [\sprintf(' <comment>%s</>', $handlerDescription)];
}
}
$tableRows[] = [''];
@@ -93,7 +93,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$io->newLine();
$io->table([], $tableRows);
} else {
$io->warning(sprintf('No handled message found in bus "%s".', $bus));
$io->warning(\sprintf('No handled message found in bus "%s".', $bus));
}
}

12 changes: 6 additions & 6 deletions Command/FailedMessagesRemoveCommand.php
Original file line number Diff line number Diff line change
@@ -78,7 +78,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$shouldDisplayMessages = $input->getOption('show-messages') || 1 === $idsCount;

if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $failureTransportName));
throw new RuntimeException(\sprintf('The "%s" receiver does not support removing specific messages.', $failureTransportName));
}

if ($shouldDeleteAllMessages) {
@@ -101,7 +101,7 @@ private function removeMessagesById(array $ids, ListableReceiverInterface $recei
}

if (null === $envelope) {
$io->error(sprintf('The message with id "%s" was not found.', $id));
$io->error(\sprintf('The message with id "%s" was not found.', $id));
continue;
}

@@ -112,9 +112,9 @@ private function removeMessagesById(array $ids, ListableReceiverInterface $recei
if ($shouldForce || $io->confirm('Do you want to permanently remove this message?', false)) {
$receiver->reject($envelope);

$io->success(sprintf('Message with id %s removed.', $id));
$io->success(\sprintf('Message with id %s removed.', $id));
} else {
$io->note(sprintf('Message with id %s not removed.', $id));
$io->note(\sprintf('Message with id %s not removed.', $id));
}
}
}
@@ -123,7 +123,7 @@ private function removeAllMessages(ListableReceiverInterface $receiver, SymfonyS
{
if (!$shouldForce) {
if ($receiver instanceof MessageCountAwareInterface) {
$question = sprintf('Do you want to permanently remove all (%d) messages?', $receiver->getMessageCount());
$question = \sprintf('Do you want to permanently remove all (%d) messages?', $receiver->getMessageCount());
} else {
$question = 'Do you want to permanently remove all failed messages?';
}
@@ -143,6 +143,6 @@ private function removeAllMessages(ListableReceiverInterface $receiver, SymfonyS
++$count;
}

$io->note(sprintf('%d messages were removed.', $count));
$io->note(\sprintf('%d messages were removed.', $count));
}
}
41 changes: 33 additions & 8 deletions Command/FailedMessagesRetryCommand.php
Original file line number Diff line number Diff line change
@@ -23,9 +23,11 @@
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageSkipEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\MessageDecodingFailedStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
@@ -39,6 +41,8 @@
#[AsCommand(name: 'messenger:failed:retry', description: 'Retry one or more messages from the failure transport')]
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand implements SignalableCommandInterface
{
private const DEFAULT_KEEPALIVE_INTERVAL = 5;

private bool $shouldStop = false;
private bool $forceExit = false;
private ?Worker $worker = null;
@@ -62,14 +66,15 @@ protected function configure(): void
new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
])
->setHelp(<<<'EOF'
The <info>%command.name%</info> retries message in the failure transport.
<info>php %command.full_name%</info>
The command will interactively ask if each message should be retried
or discarded.
The command will interactively ask if each message should be retried,
discarded or skipped.
Some transports support retrying a specific message id, which comes
from the <info>messenger:failed:show</info> command.
@@ -85,6 +90,13 @@ protected function configure(): void
;
}

protected function initialize(InputInterface $input, OutputInterface $output): void
{
if ($input->hasParameterOption('--keepalive')) {
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
}
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
@@ -107,7 +119,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$receiver = $this->getReceiver($failureTransportName);
$this->printPendingMessagesMessage($receiver, $io);

$io->writeln(sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>', $failureTransportName));
$io->writeln(\sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>', $failureTransportName));

$shouldForce = $input->getOption('force');
$ids = $input->getArgument('id');
@@ -132,7 +144,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

public function getSubscribedSignals(): array
{
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT] : []);
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT, \SIGALRM] : []);
}

public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
@@ -141,6 +153,14 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
return false;
}

if (\SIGALRM === $signal) {
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);

$this->worker->keepalive($this->getApplication()->getAlarmInterval());

return false;
}

$this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]);

$this->worker->stop();
@@ -199,12 +219,13 @@ private function runWorker(string $failureTransportName, ReceiverInterface $rece
$this->displaySingleMessage($envelope, $io);

if ($envelope->last(MessageDecodingFailedStamp::class)) {
throw new \RuntimeException(sprintf('The message with id "%s" could not decoded, it can only be shown or removed.', $this->getMessageId($envelope) ?? '?'));
throw new \RuntimeException(\sprintf('The message with id "%s" could not decoded, it can only be shown or removed.', $this->getMessageId($envelope) ?? '?'));
}

$this->forceExit = true;
try {
$shouldHandle = $shouldForce || 'retry' === $io->choice('Please select an action', ['retry', 'delete'], 'retry');
$choice = $io->choice('Please select an action', ['retry', 'delete', 'skip'], 'retry');
$shouldHandle = $shouldForce || 'retry' === $choice;
} finally {
$this->forceExit = false;
}
@@ -213,6 +234,10 @@ private function runWorker(string $failureTransportName, ReceiverInterface $rece
return;
}

if ('skip' === $choice) {
$this->eventDispatcher->dispatch(new WorkerMessageSkipEvent($envelope, $envelope->last(SentToFailureTransportStamp::class)->getOriginalReceiverName()));
}

$messageReceivedEvent->shouldHandle(false);
$receiver->reject($envelope);
};
@@ -240,7 +265,7 @@ private function retrySpecificIds(string $failureTransportName, array $ids, Symf
$receiver = $this->getReceiver($failureTransportName);

if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $failureTransportName));
throw new RuntimeException(\sprintf('The "%s" receiver does not support retrying messages by id.', $failureTransportName));
}

foreach ($ids as $id) {
@@ -251,7 +276,7 @@ private function retrySpecificIds(string $failureTransportName, array $ids, Symf
$this->phpSerializer?->rejectPhpIncompleteClass();
}
if (null === $envelope) {
throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
throw new RuntimeException(\sprintf('The message "%s" was not found.', $id));
}

$singleReceiver = new SingleMessageReceiver($receiver, $envelope);
16 changes: 8 additions & 8 deletions Command/FailedMessagesShowCommand.php
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$this->printPendingMessagesMessage($receiver, $io);

if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support listing or showing specific messages.', $failureTransportName));
throw new RuntimeException(\sprintf('The "%s" receiver does not support listing or showing specific messages.', $failureTransportName));
}

if ($input->getOption('stats')) {
@@ -93,7 +93,7 @@ private function listMessages(?string $failedTransportName, SymfonyStyle $io, in
$rows = [];

if ($classFilter) {
$io->comment(sprintf('Displaying only \'%s\' messages', $classFilter));
$io->comment(\sprintf('Displaying only \'%s\' messages', $classFilter));
}

$this->phpSerializer?->acceptPhpIncompleteClass();
@@ -132,12 +132,12 @@ private function listMessages(?string $failedTransportName, SymfonyStyle $io, in
$io->table(['Id', 'Class', 'Failed at', 'Error'], $rows);

if ($rowsCount === $max) {
$io->comment(sprintf('Showing first %d messages.', $max));
$io->comment(\sprintf('Showing first %d messages.', $max));
} elseif ($classFilter) {
$io->comment(sprintf('Showing %d message(s).', $rowsCount));
$io->comment(\sprintf('Showing %d message(s).', $rowsCount));
}

$io->comment(sprintf('Run <comment>messenger:failed:show {id} --transport=%s -vv</comment> to see message details.', $failedTransportName));
$io->comment(\sprintf('Run <comment>messenger:failed:show {id} --transport=%s -vv</comment> to see message details.', $failedTransportName));
}

private function listMessagesPerClass(?string $failedTransportName, SymfonyStyle $io, int $max): void
@@ -183,15 +183,15 @@ private function showMessage(?string $failedTransportName, string $id, SymfonySt
$this->phpSerializer?->rejectPhpIncompleteClass();
}
if (null === $envelope) {
throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
throw new RuntimeException(\sprintf('The message "%s" was not found.', $id));
}

$this->displaySingleMessage($envelope, $io);

$io->writeln([
'',
sprintf(' Run <comment>messenger:failed:retry %s --transport=%s</comment> to retry this message.', $id, $failedTransportName),
sprintf(' Run <comment>messenger:failed:remove %s --transport=%s</comment> to delete it.', $id, $failedTransportName),
\sprintf(' Run <comment>messenger:failed:retry %s --transport=%s</comment> to retry this message.', $id, $failedTransportName),
\sprintf(' Run <comment>messenger:failed:remove %s --transport=%s</comment> to delete it.', $id, $failedTransportName),
]);
}
}
10 changes: 4 additions & 6 deletions Command/SetupTransportsCommand.php
Original file line number Diff line number Diff line change
@@ -60,23 +60,23 @@ protected function execute(InputInterface $input, OutputInterface $output): int
// do we want to set up only one transport?
if ($transport = $input->getArgument('transport')) {
if (!$this->transportLocator->has($transport)) {
throw new \RuntimeException(sprintf('The "%s" transport does not exist.', $transport));
throw new \RuntimeException(\sprintf('The "%s" transport does not exist.', $transport));
}
$transportNames = [$transport];
}

foreach ($transportNames as $id => $transportName) {
$transport = $this->transportLocator->get($transportName);
if (!$transport instanceof SetupableTransportInterface) {
$io->note(sprintf('The "%s" transport does not support setup.', $transportName));
$io->note(\sprintf('The "%s" transport does not support setup.', $transportName));
continue;
}

try {
$transport->setup();
$io->success(sprintf('The "%s" transport was set up successfully.', $transportName));
$io->success(\sprintf('The "%s" transport was set up successfully.', $transportName));
} catch (\Exception $e) {
throw new \RuntimeException(sprintf('An error occurred while setting up the "%s" transport: ', $transportName).$e->getMessage(), 0, $e);
throw new \RuntimeException(\sprintf('An error occurred while setting up the "%s" transport: ', $transportName).$e->getMessage(), 0, $e);
}
}

@@ -87,8 +87,6 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti
{
if ($input->mustSuggestArgumentValuesFor('transport')) {
$suggestions->suggestValues($this->transportNames);

return;
}
}
}
70 changes: 67 additions & 3 deletions Command/StatsCommand.php
Original file line number Diff line number Diff line change
@@ -14,11 +14,15 @@
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Completion\CompletionInput;
use Symfony\Component\Console\Completion\CompletionSuggestions;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;

/**
@@ -38,6 +42,7 @@ protected function configure(): void
{
$this
->addArgument('transport_names', InputArgument::IS_ARRAY | InputArgument::OPTIONAL, 'List of transports\' names')
->addOption('format', '', InputOption::VALUE_REQUIRED, \sprintf('The output format ("%s")', implode('", "', $this->getAvailableFormatOptions())), 'txt')
->setHelp(<<<EOF
The <info>%command.name%</info> command counts the messages for all the transports:
@@ -46,6 +51,10 @@ protected function configure(): void
Or specific transports only:
<info>php %command.full_name% <transportNames></info>
The <info>--format</info> option specifies the format of the command output:
<info>php %command.full_name% --format=json</info>
EOF
)
;
@@ -55,6 +64,16 @@ protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);

$format = $input->getOption('format');
if ('text' === $format) {
trigger_deprecation('symfony/messenger', '7.2', 'The "text" format is deprecated, use "txt" instead.');

$format = 'txt';
}
if (!\in_array($format, $this->getAvailableFormatOptions(), true)) {
throw new InvalidArgumentException('Invalid output format.');
}

$transportNames = $this->transportNames;
if ($input->getArgument('transport_names')) {
$transportNames = $input->getArgument('transport_names');
@@ -64,7 +83,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$uncountableTransports = [];
foreach ($transportNames as $transportName) {
if (!$this->transportLocator->has($transportName)) {
$io->warning(sprintf('The "%s" transport does not exist.', $transportName));
if ($this->formatSupportsWarnings($format)) {
$io->warning(\sprintf('The "%s" transport does not exist.', $transportName));
}

continue;
}
@@ -77,12 +98,55 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$outputTable[] = [$transportName, $transport->getMessageCount()];
}

match ($format) {
'txt' => $this->outputText($io, $outputTable, $uncountableTransports),
'json' => $this->outputJson($io, $outputTable, $uncountableTransports),
};

return 0;
}

private function outputText(SymfonyStyle $io, array $outputTable, array $uncountableTransports): void
{
$io->table(['Transport', 'Count'], $outputTable);

if ($uncountableTransports) {
$io->note(sprintf('Unable to get message count for the following transports: "%s".', implode('", "', $uncountableTransports)));
$io->note(\sprintf('Unable to get message count for the following transports: "%s".', implode('", "', $uncountableTransports)));
}
}

return 0;
private function outputJson(SymfonyStyle $io, array $outputTable, array $uncountableTransports): void
{
$output = ['transports' => []];
foreach ($outputTable as [$transportName, $count]) {
$output['transports'][$transportName] = ['count' => $count];
}

if ($uncountableTransports) {
$output['uncountable_transports'] = $uncountableTransports;
}

$io->writeln(json_encode($output, \JSON_PRETTY_PRINT));
}

private function formatSupportsWarnings(string $format): bool
{
return match ($format) {
'txt' => true,
'json' => false,
};
}

public function complete(CompletionInput $input, CompletionSuggestions $suggestions): void
{
if ($input->mustSuggestOptionValuesFor('format')) {
$suggestions->suggestValues($this->getAvailableFormatOptions());
}
}

/** @return string[] */
private function getAvailableFormatOptions(): array
{
return ['txt', 'json'];
}
}
34 changes: 17 additions & 17 deletions DependencyInjection/MessengerPass.php
Original file line number Diff line number Diff line change
@@ -61,14 +61,14 @@ private function registerHandlers(ContainerBuilder $container, array $busIds): v
foreach ($container->findTaggedServiceIds('messenger.message_handler', true) as $serviceId => $tags) {
foreach ($tags as $tag) {
if (isset($tag['bus']) && !\in_array($tag['bus'], $busIds, true)) {
throw new RuntimeException(sprintf('Invalid handler service "%s": bus "%s" specified on the tag "messenger.message_handler" does not exist (known ones are: "%s").', $serviceId, $tag['bus'], implode('", "', $busIds)));
throw new RuntimeException(\sprintf('Invalid handler service "%s": bus "%s" specified on the tag "messenger.message_handler" does not exist (known ones are: "%s").', $serviceId, $tag['bus'], implode('", "', $busIds)));
}

$className = $this->getServiceClass($container, $serviceId);
$r = $container->getReflectionClass($className);

if (null === $r) {
throw new RuntimeException(sprintf('Invalid service "%s": class "%s" does not exist.', $serviceId, $className));
throw new RuntimeException(\sprintf('Invalid service "%s": class "%s" does not exist.', $serviceId, $className));
}

if (isset($tag['handles'])) {
@@ -88,7 +88,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds): v
$message = $options;
$options = [];
} else {
throw new RuntimeException(sprintf('The handler configuration needs to return an array of messages or an associated array of message and configuration. Found value of type "%s" at position "%d" for service "%s".', get_debug_type($options), $message, $serviceId));
throw new RuntimeException(\sprintf('The handler configuration needs to return an array of messages or an associated array of message and configuration. Found value of type "%s" at position "%d" for service "%s".', get_debug_type($options), $message, $serviceId));
}
}

@@ -104,22 +104,22 @@ private function registerHandlers(ContainerBuilder $container, array $busIds): v

if (isset($options['bus'])) {
if (!\in_array($options['bus'], $busIds)) {
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : \sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);

throw new RuntimeException(sprintf('Invalid configuration '.$messageLocation.' for message "%s": bus "%s" does not exist.', $message, $options['bus']));
throw new RuntimeException(\sprintf('Invalid configuration '.$messageLocation.' for message "%s": bus "%s" does not exist.', $message, $options['bus']));
}

$buses = [$options['bus']];
}

if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : \sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);

throw new RuntimeException(sprintf('Invalid handler service "%s": class or interface "%s" '.$messageLocation.' not found.', $serviceId, $message));
throw new RuntimeException(\sprintf('Invalid handler service "%s": class or interface "%s" '.$messageLocation.' not found.', $serviceId, $message));
}

if (!$r->hasMethod($method)) {
throw new RuntimeException(sprintf('Invalid handler service "%s": method "%s::%s()" does not exist.', $serviceId, $r->getName(), $method));
throw new RuntimeException(\sprintf('Invalid handler service "%s": method "%s::%s()" does not exist.', $serviceId, $r->getName(), $method));
}

if ('__invoke' !== $method || '' !== $fromTransport) {
@@ -138,7 +138,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds): v
}

if (null === $message) {
throw new RuntimeException(sprintf('Invalid handler service "%s": the list of messages to handle is empty.', $serviceId));
throw new RuntimeException(\sprintf('Invalid handler service "%s": the list of messages to handle is empty.', $serviceId));
}
}
}
@@ -197,11 +197,11 @@ private function guessHandledClasses(\ReflectionClass $handlerClass, string $ser
try {
$method = $handlerClass->getMethod($methodName);
} catch (\ReflectionException) {
throw new RuntimeException(sprintf('Invalid handler service "%s": class "%s" must have an "%s()" method.', $serviceId, $handlerClass->getName(), $methodName));
throw new RuntimeException(\sprintf('Invalid handler service "%s": class "%s" must have an "%s()" method.', $serviceId, $handlerClass->getName(), $methodName));
}

if (0 === $method->getNumberOfRequiredParameters()) {
throw new RuntimeException(sprintf('Invalid handler service "%s": method "%s::%s()" requires at least one argument, first one being the message it handles.', $serviceId, $handlerClass->getName(), $methodName));
throw new RuntimeException(\sprintf('Invalid handler service "%s": method "%s::%s()" requires at least one argument, first one being the message it handles.', $serviceId, $handlerClass->getName(), $methodName));
}

$parameters = $method->getParameters();
@@ -210,7 +210,7 @@ private function guessHandledClasses(\ReflectionClass $handlerClass, string $ser
$type = $parameters[0]->getType();

if (!$type) {
throw new RuntimeException(sprintf('Invalid handler service "%s": argument "$%s" of method "%s::%s()" must have a type-hint corresponding to the message class it handles.', $serviceId, $parameters[0]->getName(), $handlerClass->getName(), $methodName));
throw new RuntimeException(\sprintf('Invalid handler service "%s": argument "$%s" of method "%s::%s()" must have a type-hint corresponding to the message class it handles.', $serviceId, $parameters[0]->getName(), $handlerClass->getName(), $methodName));
}

if ($type instanceof \ReflectionUnionType) {
@@ -228,11 +228,11 @@ private function guessHandledClasses(\ReflectionClass $handlerClass, string $ser
return ('__invoke' === $methodName) ? $types : array_fill_keys($types, $methodName);
}

throw new RuntimeException(sprintf('Invalid handler service "%s": type-hint of argument "$%s" in method "%s::__invoke()" must be a class , "%s" given.', $serviceId, $parameters[0]->getName(), $handlerClass->getName(), implode('|', $invalidTypes)));
throw new RuntimeException(\sprintf('Invalid handler service "%s": type-hint of argument "$%s" in method "%s::__invoke()" must be a class , "%s" given.', $serviceId, $parameters[0]->getName(), $handlerClass->getName(), implode('|', $invalidTypes)));
}

if ($type->isBuiltin()) {
throw new RuntimeException(sprintf('Invalid handler service "%s": type-hint of argument "$%s" in method "%s::%s()" must be a class , "%s" given.', $serviceId, $parameters[0]->getName(), $handlerClass->getName(), $methodName, $type instanceof \ReflectionNamedType ? $type->getName() : (string) $type));
throw new RuntimeException(\sprintf('Invalid handler service "%s": type-hint of argument "$%s" in method "%s::%s()" must be a class , "%s" given.', $serviceId, $parameters[0]->getName(), $handlerClass->getName(), $methodName, $type instanceof \ReflectionNamedType ? $type->getName() : (string) $type));
}

return ('__invoke' === $methodName) ? [$type->getName()] : [$type->getName() => $methodName];
@@ -257,7 +257,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds):
foreach ($container->findTaggedServiceIds('messenger.receiver') as $id => $tags) {
$receiverClass = $this->getServiceClass($container, $id);
if (!is_subclass_of($receiverClass, ReceiverInterface::class)) {
throw new RuntimeException(sprintf('Invalid receiver "%s": class "%s" must implement interface "%s".', $id, $receiverClass, ReceiverInterface::class));
throw new RuntimeException(\sprintf('Invalid receiver "%s": class "%s" must implement interface "%s".', $id, $receiverClass, ReceiverInterface::class));
}

$receiverMapping[$id] = new Reference($id);
@@ -350,7 +350,7 @@ private function registerBusMiddleware(ContainerBuilder $container, string $busI
}

if (!$container->has($messengerMiddlewareId)) {
throw new RuntimeException(sprintf('Invalid middleware: service "%s" not found.', $id));
throw new RuntimeException(\sprintf('Invalid middleware: service "%s" not found.', $id));
}

if ($container->findDefinition($messengerMiddlewareId)->isAbstract()) {
@@ -361,7 +361,7 @@ private function registerBusMiddleware(ContainerBuilder $container, string $busI
}
$container->setDefinition($messengerMiddlewareId, $childDefinition);
} elseif ($arguments) {
throw new RuntimeException(sprintf('Invalid middleware factory "%s": a middleware factory must be an abstract definition.', $id));
throw new RuntimeException(\sprintf('Invalid middleware factory "%s": a middleware factory must be an abstract definition.', $id));
}

$middlewareReferences[$messengerMiddlewareId] = new Reference($messengerMiddlewareId);
9 changes: 4 additions & 5 deletions Envelope.php
Original file line number Diff line number Diff line change
@@ -24,16 +24,15 @@ final class Envelope
* @var array<class-string<StampInterface>, list<StampInterface>>
*/
private array $stamps = [];
private object $message;

/**
* @param object|Envelope $message
* @param StampInterface[] $stamps
*/
public function __construct(object $message, array $stamps = [])
{
$this->message = $message;

public function __construct(
private object $message,
array $stamps = [],
) {
foreach ($stamps as $stamp) {
$this->stamps[$stamp::class][] = $stamp;
}
21 changes: 21 additions & 0 deletions Event/WorkerMessageSkipEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Event;

/**
* Dispatched when a message was skip.
*
* The event name is the class name.
*/
final class WorkerMessageSkipEvent extends AbstractWorkerMessageEvent
{
}
9 changes: 7 additions & 2 deletions EventListener/SendFailedMessageForRetryListener.php
Original file line number Diff line number Diff line change
@@ -63,7 +63,12 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void

++$retryCount;

$delay = $retryStrategy->getWaitingTime($envelope, $throwable);
$delay = null;
if ($throwable instanceof RecoverableExceptionInterface && method_exists($throwable, 'getRetryDelay')) {
$delay = $throwable->getRetryDelay();
}

$delay ??= $retryStrategy->getWaitingTime($envelope, $throwable);

$this->logger?->warning('Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]);

@@ -158,6 +163,6 @@ private function getSenderForTransport(string $alias): SenderInterface
return $this->sendersLocator->get($alias);
}

throw new RuntimeException(sprintf('Could not find sender "%s" based on the same receiver to send the failed message to for retry.', $alias));
throw new RuntimeException(\sprintf('Could not find sender "%s" based on the same receiver to send the failed message to for retry.', $alias));
}
}
17 changes: 17 additions & 0 deletions EventListener/SendFailedMessageToFailureTransportListener.php
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageSkipEvent;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
@@ -65,10 +66,26 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void
$failureSender->send($envelope);
}

public function onMessageSkip(WorkerMessageSkipEvent $event): void
{
if (!$this->failureSenders->has($event->getReceiverName())) {
return;
}

$failureSender = $this->failureSenders->get($event->getReceiverName());
$envelope = $event->getEnvelope()->with(
new SentToFailureTransportStamp($event->getReceiverName()),
new DelayStamp(0),
);

$failureSender->send($envelope);
}

public static function getSubscribedEvents(): array
{
return [
WorkerMessageFailedEvent::class => ['onMessageFailed', -100],
WorkerMessageSkipEvent::class => ['onMessageSkip', -100],
];
}
}
4 changes: 2 additions & 2 deletions Exception/DelayedMessageHandlingException.php
Original file line number Diff line number Diff line change
@@ -36,9 +36,9 @@ public function __construct(
));

if (1 === \count($exceptions)) {
$message = sprintf("A delayed message handler threw an exception: \n\n%s", $exceptionMessages);
$message = \sprintf("A delayed message handler threw an exception: \n\n%s", $exceptionMessages);
} else {
$message = sprintf("Some delayed message handlers threw an exception: \n\n%s", $exceptionMessages);
$message = \sprintf("Some delayed message handlers threw an exception: \n\n%s", $exceptionMessages);
}

parent::__construct($message, 0, $exceptions[array_key_first($exceptions)]);
4 changes: 2 additions & 2 deletions Exception/HandlerFailedException.php
Original file line number Diff line number Diff line change
@@ -26,12 +26,12 @@ public function __construct(
) {
$firstFailure = current($exceptions);

$message = sprintf('Handling "%s" failed: ', $envelope->getMessage()::class);
$message = \sprintf('Handling "%s" failed: ', $envelope->getMessage()::class);

parent::__construct(
$message.(1 === \count($exceptions)
? $firstFailure->getMessage()
: sprintf('%d handlers failed. First failure is: %s', \count($exceptions), $firstFailure->getMessage())
: \sprintf('%d handlers failed. First failure is: %s', \count($exceptions), $firstFailure->getMessage())
),
(int) $firstFailure->getCode(),
$firstFailure
2 changes: 2 additions & 0 deletions Exception/RecoverableExceptionInterface.php
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@
* and the message should be retried, a handler can throw such an exception.
*
* @author Jérémy Derussé <jeremy@derusse.com>
*
* @method int|null getRetryDelay() The time to wait in milliseconds
*/
interface RecoverableExceptionInterface extends \Throwable
{
9 changes: 9 additions & 0 deletions Exception/RecoverableMessageHandlingException.php
Original file line number Diff line number Diff line change
@@ -18,4 +18,13 @@
*/
class RecoverableMessageHandlingException extends RuntimeException implements RecoverableExceptionInterface
{
public function __construct(string $message = '', int $code = 0, ?\Throwable $previous = null, private readonly ?int $retryDelay = null)
{
parent::__construct($message, $code, $previous);
}

public function getRetryDelay(): ?int
{
return $this->retryDelay;
}
}
2 changes: 1 addition & 1 deletion Exception/ValidationFailedException.php
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ public function __construct(
) {
$this->envelope = $envelope;

parent::__construct(sprintf('Message of type "%s" failed validation.', $this->violatingMessage::class));
parent::__construct(\sprintf('Message of type "%s" failed validation.', $this->violatingMessage::class));
}

public function getViolatingMessage(): object
8 changes: 7 additions & 1 deletion Exception/WrappedExceptionsInterface.php
Original file line number Diff line number Diff line change
@@ -16,10 +16,16 @@
*
* @author Jeroen <https://github.com/Jeroeny>
*/
interface WrappedExceptionsInterface
interface WrappedExceptionsInterface extends \Throwable
{
/**
* @template TException of \Throwable
*
* @param class-string<TException>|null $class
*
* @return \Throwable[]
*
* @psalm-return ($class is null ? \Throwable[] : TException[])
*/
public function getWrappedExceptions(?string $class = null, bool $recursive = false): array;
}
6 changes: 6 additions & 0 deletions Exception/WrappedExceptionsTrait.php
Original file line number Diff line number Diff line change
@@ -21,7 +21,13 @@ trait WrappedExceptionsTrait
private array $exceptions;

/**
* @template TException of \Throwable
*
* @param class-string<TException>|null $class
*
* @return \Throwable[]
*
* @psalm-return ($class is null ? \Throwable[] : TException[])
*/
public function getWrappedExceptions(?string $class = null, bool $recursive = false): array
{
8 changes: 4 additions & 4 deletions HandleTrait.php
Original file line number Diff line number Diff line change
@@ -34,21 +34,21 @@ trait HandleTrait
private function handle(object $message): mixed
{
if (!isset($this->messageBus)) {
throw new LogicException(sprintf('You must provide a "%s" instance in the "%s::$messageBus" property, but that property has not been initialized yet.', MessageBusInterface::class, static::class));
throw new LogicException(\sprintf('You must provide a "%s" instance in the "%s::$messageBus" property, but that property has not been initialized yet.', MessageBusInterface::class, static::class));
}

$envelope = $this->messageBus->dispatch($message);
/** @var HandledStamp[] $handledStamps */
$handledStamps = $envelope->all(HandledStamp::class);

if (!$handledStamps) {
throw new LogicException(sprintf('Message of type "%s" was handled zero times. Exactly one handler is expected when using "%s::%s()".', get_debug_type($envelope->getMessage()), static::class, __FUNCTION__));
throw new LogicException(\sprintf('Message of type "%s" was handled zero times. Exactly one handler is expected when using "%s::%s()".', get_debug_type($envelope->getMessage()), static::class, __FUNCTION__));
}

if (\count($handledStamps) > 1) {
$handlers = implode(', ', array_map(fn (HandledStamp $stamp): string => sprintf('"%s"', $stamp->getHandlerName()), $handledStamps));
$handlers = implode(', ', array_map(fn (HandledStamp $stamp): string => \sprintf('"%s"', $stamp->getHandlerName()), $handledStamps));

throw new LogicException(sprintf('Message of type "%s" was handled multiple times. Only one handler is expected when using "%s::%s()", got %d: %s.', get_debug_type($envelope->getMessage()), static::class, __FUNCTION__, \count($handledStamps), $handlers));
throw new LogicException(\sprintf('Message of type "%s" was handled multiple times. Only one handler is expected when using "%s::%s()", got %d: %s.', get_debug_type($envelope->getMessage()), static::class, __FUNCTION__, \count($handledStamps), $handlers));
}

return $handledStamps[0]->getResult();
4 changes: 2 additions & 2 deletions Handler/Acknowledger.php
Original file line number Diff line number Diff line change
@@ -63,14 +63,14 @@ public function isAcknowledged(): bool
public function __destruct()
{
if (null !== $this->ack) {
throw new LogicException(sprintf('The acknowledger was not called by the "%s" batch handler.', $this->handlerClass));
throw new LogicException(\sprintf('The acknowledger was not called by the "%s" batch handler.', $this->handlerClass));
}
}

private function doAck(?\Throwable $e = null, mixed $result = null): void
{
if (!$ack = $this->ack) {
throw new LogicException(sprintf('The acknowledger cannot be called twice by the "%s" batch handler.', $this->handlerClass));
throw new LogicException(\sprintf('The acknowledger cannot be called twice by the "%s" batch handler.', $this->handlerClass));
}
$this->ack = null;
$this->error = $e;
2 changes: 1 addition & 1 deletion Message/RedispatchMessage.php
Original file line number Diff line number Diff line change
@@ -29,6 +29,6 @@ public function __toString(): string
{
$message = $this->envelope instanceof Envelope ? $this->envelope->getMessage() : $this->envelope;

return sprintf('%s via %s', $message instanceof \Stringable ? (string) $message : $message::class, implode(', ', (array) $this->transportNames));
return \sprintf('%s via %s', $message instanceof \Stringable ? (string) $message : $message::class, implode(', ', (array) $this->transportNames));
}
}
7 changes: 3 additions & 4 deletions MessageBus.php
Original file line number Diff line number Diff line change
@@ -36,12 +36,11 @@ public function __construct(iterable $middlewareHandlers = [])
// $this->middlewareAggregate should be an instance of IteratorAggregate.
// When $middlewareHandlers is an Iterator, we wrap it to ensure it is lazy-loaded and can be rewound.
$this->middlewareAggregate = new class($middlewareHandlers) implements \IteratorAggregate {
private \Traversable $middlewareHandlers;
private \ArrayObject $cachedIterator;

public function __construct(\Traversable $middlewareHandlers)
{
$this->middlewareHandlers = $middlewareHandlers;
public function __construct(
private \Traversable $middlewareHandlers,
) {
}

public function getIterator(): \Traversable
8 changes: 4 additions & 4 deletions Middleware/DispatchAfterCurrentBusMiddleware.php
Original file line number Diff line number Diff line change
@@ -110,12 +110,12 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
final class QueuedEnvelope
{
private Envelope $envelope;
private StackInterface $stack;

public function __construct(Envelope $envelope, StackInterface $stack)
{
public function __construct(
Envelope $envelope,
private StackInterface $stack,
) {
$this->envelope = $envelope->withoutAll(DispatchAfterCurrentBusStamp::class);
$this->stack = $stack;
}

public function getEnvelope(): Envelope
4 changes: 2 additions & 2 deletions Middleware/HandleMessageMiddleware.php
Original file line number Diff line number Diff line change
@@ -77,7 +77,7 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
$result = $this->callHandler($handler, $message, $ack, $envelope->last(HandlerArgumentsStamp::class));

if (!\is_int($result) || 0 > $result) {
throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler)));
throw new LogicException(\sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler)));
}

if (!$ack->isAcknowledged()) {
@@ -114,7 +114,7 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope

if (null === $handler && !$alreadyHandled) {
if (!$this->allowNoHandlers) {
throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $context['class']));
throw new NoHandlerForMessageException(\sprintf('No handler for message "%s".', $context['class']));
}

$this->logger?->info('No handler for message {class}', $context);
4 changes: 2 additions & 2 deletions Middleware/RouterContextMiddleware.php
Original file line number Diff line number Diff line change
@@ -30,8 +30,9 @@ public function __construct(

public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$context = $this->router->getContext();

if (!$envelope->last(ConsumedByWorkerStamp::class) || !$contextStamp = $envelope->last(RouterContextStamp::class)) {
$context = $this->router->getContext();
$envelope = $envelope->with(new RouterContextStamp(
$context->getBaseUrl(),
$context->getMethod(),
@@ -46,7 +47,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
return $stack->next()->handle($envelope, $stack);
}

$context = $this->router->getContext();
$currentBaseUrl = $context->getBaseUrl();
$currentMethod = $context->getMethod();
$currentHost = $context->getHost();
2 changes: 1 addition & 1 deletion Middleware/SendMessageMiddleware.php
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
}

if (!$this->allowNoSenders && !$sender) {
throw new NoSenderForMessageException(sprintf('No sender for message "%s".', $context['class']));
throw new NoSenderForMessageException(\sprintf('No sender for message "%s".', $context['class']));
}
}

4 changes: 2 additions & 2 deletions Middleware/TraceableMiddleware.php
Original file line number Diff line number Diff line change
@@ -64,9 +64,9 @@ public function next(): MiddlewareInterface
if ($this->stack === $nextMiddleware = $this->stack->next()) {
$this->currentEvent = 'Tail';
} else {
$this->currentEvent = sprintf('"%s"', get_debug_type($nextMiddleware));
$this->currentEvent = \sprintf('"%s"', get_debug_type($nextMiddleware));
}
$this->currentEvent .= sprintf(' on "%s"', $this->busName);
$this->currentEvent .= \sprintf(' on "%s"', $this->busName);

$this->stopwatch->start($this->currentEvent, $this->eventCategory);

8 changes: 4 additions & 4 deletions Retry/MultiplierRetryStrategy.php
Original file line number Diff line number Diff line change
@@ -47,19 +47,19 @@ public function __construct(
private float $jitter = 0.1,
) {
if ($delayMilliseconds < 0) {
throw new InvalidArgumentException(sprintf('Delay must be greater than or equal to zero: "%s" given.', $delayMilliseconds));
throw new InvalidArgumentException(\sprintf('Delay must be greater than or equal to zero: "%s" given.', $delayMilliseconds));
}

if ($multiplier < 1) {
throw new InvalidArgumentException(sprintf('Multiplier must be greater than zero: "%s" given.', $multiplier));
throw new InvalidArgumentException(\sprintf('Multiplier must be greater than zero: "%s" given.', $multiplier));
}

if ($maxDelayMilliseconds < 0) {
throw new InvalidArgumentException(sprintf('Max delay must be greater than or equal to zero: "%s" given.', $maxDelayMilliseconds));
throw new InvalidArgumentException(\sprintf('Max delay must be greater than or equal to zero: "%s" given.', $maxDelayMilliseconds));
}

if ($jitter < 0 || $jitter > 1) {
throw new InvalidArgumentException(sprintf('Jitter must be between 0 and 1: "%s" given.', $jitter));
throw new InvalidArgumentException(\sprintf('Jitter must be between 0 and 1: "%s" given.', $jitter));
}
}

13 changes: 5 additions & 8 deletions RoutableMessageBus.php
Original file line number Diff line number Diff line change
@@ -25,13 +25,10 @@
*/
class RoutableMessageBus implements MessageBusInterface
{
private ContainerInterface $busLocator;
private ?MessageBusInterface $fallbackBus;

public function __construct(ContainerInterface $busLocator, ?MessageBusInterface $fallbackBus = null)
{
$this->busLocator = $busLocator;
$this->fallbackBus = $fallbackBus;
public function __construct(
private ContainerInterface $busLocator,
private ?MessageBusInterface $fallbackBus = null,
) {
}

public function dispatch(object $envelope, array $stamps = []): Envelope
@@ -60,7 +57,7 @@ public function dispatch(object $envelope, array $stamps = []): Envelope
public function getMessageBus(string $busName): MessageBusInterface
{
if (!$this->busLocator->has($busName)) {
throw new InvalidArgumentException(sprintf('Bus named "%s" does not exist.', $busName));
throw new InvalidArgumentException(\sprintf('Bus named "%s" does not exist.', $busName));
}

return $this->busLocator->get($busName);
37 changes: 37 additions & 0 deletions Tests/Command/FailedMessagesRetryCommandTest.php
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;

@@ -223,4 +224,40 @@ public function testCompleteIdWithSpecifiedTransport()

$this->assertSame(['2ab50dfa1fbf', '78c2da843723'], $suggestions);
}

public function testSkipRunWithServiceLocator()
{
$failureTransportName = 'failure_receiver';
$originalTransportName = 'original_receiver';

$serviceLocator = $this->createMock(ServiceLocator::class);
$receiver = $this->createMock(ListableReceiverInterface::class);

$dispatcher = new EventDispatcher();
$bus = $this->createMock(MessageBusInterface::class);

$serviceLocator->method('has')->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);

$receiver->expects($this->once())->method('find')
->willReturn(Envelope::wrap(new \stdClass(), [
new SentToFailureTransportStamp($originalTransportName),
]));

$receiver->expects($this->never())->method('ack');
$receiver->expects($this->once())->method('reject');

$command = new FailedMessagesRetryCommand(
$failureTransportName,
$serviceLocator,
$bus,
$dispatcher
);

$tester = new CommandTester($command);
$tester->setInputs(['skip']);

$tester->execute(['id' => ['10']]);
$this->assertStringContainsString('[OK]', $tester->getDisplay());
}
}
44 changes: 44 additions & 0 deletions Tests/Command/StatsCommandTest.php
Original file line number Diff line number Diff line change
@@ -69,6 +69,23 @@ public function testWithoutArgument()
$this->assertStringContainsString('! [NOTE] Unable to get message count for the following transports: "simple".', $display);
}

public function testWithoutArgumentJsonFormat()
{
$tester = new CommandTester($this->command);
$tester->execute(['--format' => 'json']);
$display = $tester->getDisplay();

$this->assertJsonStringEqualsJsonString('{
"transports": {
"message_countable": {"count": 6},
"another_message_countable": {"count": 6}
},
"uncountable_transports": [
"simple"
]
}', $display);
}

public function testWithOneExistingMessageCountableTransport()
{
$tester = new CommandTester($this->command);
@@ -81,6 +98,19 @@ public function testWithOneExistingMessageCountableTransport()
$this->assertStringNotContainsString(' ! [NOTE] Unable to get message count for the following transports: "simple".', $display);
}

public function testWithOneExistingMessageCountableTransportJsonFormat()
{
$tester = new CommandTester($this->command);
$tester->execute(['transport_names' => ['message_countable'], '--format' => 'json']);
$display = $tester->getDisplay();

$this->assertJsonStringEqualsJsonString('{
"transports": {
"message_countable": {"count": 6}
}
}', $display);
}

public function testWithMultipleExistingMessageCountableTransport()
{
$tester = new CommandTester($this->command);
@@ -93,6 +123,20 @@ public function testWithMultipleExistingMessageCountableTransport()
$this->assertStringNotContainsString('! [NOTE] Unable to get message count for the following transports: "simple".', $display);
}

public function testWithMultipleExistingMessageCountableTransportJsonFormat()
{
$tester = new CommandTester($this->command);
$tester->execute(['transport_names' => ['message_countable', 'another_message_countable'], '--format' => 'json']);
$display = $tester->getDisplay();

$this->assertJsonStringEqualsJsonString('{
"transports": {
"message_countable": {"count": 6},
"another_message_countable": {"count": 6}
}
}', $display);
}

public function testWithNotMessageCountableTransport()
{
$tester = new CommandTester($this->command);
4 changes: 2 additions & 2 deletions Tests/DependencyInjection/MessengerPassTest.php
Original file line number Diff line number Diff line change
@@ -622,7 +622,7 @@ public function testUnionTypeArgumentsTypeHandler()
public function testUnionBuiltinArgumentTypeHandler()
{
$this->expectException(RuntimeException::class);
$this->expectExceptionMessage(sprintf('Invalid handler service "%s": type-hint of argument "$message" in method "%s::__invoke()" must be a class , "string|int" given.', UnionBuiltinTypeArgumentHandler::class, UnionBuiltinTypeArgumentHandler::class));
$this->expectExceptionMessage(\sprintf('Invalid handler service "%s": type-hint of argument "$message" in method "%s::__invoke()" must be a class , "string|int" given.', UnionBuiltinTypeArgumentHandler::class, UnionBuiltinTypeArgumentHandler::class));
$container = $this->getContainerBuilder();
$container
->register(UnionBuiltinTypeArgumentHandler::class, UnionBuiltinTypeArgumentHandler::class)
@@ -800,7 +800,7 @@ private function getContainerBuilder(string $busId = 'message_bus'): ContainerBu
unset($tagAttributes['fromTransport']);
if ($reflector instanceof \ReflectionMethod) {
if (isset($tagAttributes['method'])) {
throw new LogicException(sprintf('AsMessageHandler attribute cannot declare a method on "%s::%s()".', $reflector->class, $reflector->name));
throw new LogicException(\sprintf('AsMessageHandler attribute cannot declare a method on "%s::%s()".', $reflector->class, $reflector->name));
}
$tagAttributes['method'] = $reflector->getName();
}
32 changes: 32 additions & 0 deletions Tests/EventListener/SendFailedMessageForRetryListenerTest.php
Original file line number Diff line number Diff line change
@@ -76,6 +76,38 @@ public function testRecoverableStrategyCausesRetry()
$listener->onMessageFailed($event);
}

public function testRecoverableExceptionRetryDelayOverridesStrategy()
{
$sender = $this->createMock(SenderInterface::class);
$sender->expects($this->once())->method('send')->willReturnCallback(function (Envelope $envelope) {
$delayStamp = $envelope->last(DelayStamp::class);
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);

$this->assertInstanceOf(DelayStamp::class, $delayStamp);
$this->assertSame(1234, $delayStamp->getDelay());

$this->assertInstanceOf(RedeliveryStamp::class, $redeliveryStamp);
$this->assertSame(1, $redeliveryStamp->getRetryCount());

return $envelope;
});
$senderLocator = new Container();
$senderLocator->set('my_receiver', $sender);
$retryStrategy = $this->createMock(RetryStrategyInterface::class);
$retryStrategy->expects($this->never())->method('isRetryable');
$retryStrategy->expects($this->never())->method('getWaitingTime');
$retryStrategyLocator = new Container();
$retryStrategyLocator->set('my_receiver', $retryStrategy);

$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);

$exception = new RecoverableMessageHandlingException('retry', retryDelay: 1234);
$envelope = new Envelope(new \stdClass());
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);

$listener->onMessageFailed($event);
}

public function testEnvelopeIsSentToTransportOnRetry()
{
$exception = new \Exception('no!');
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ public static function provideTests(): \Generator
yield 'it should not stop (1)' => [new \Exception(), false];
yield 'it should not stop (2)' => [new HandlerFailedException(new Envelope(new \stdClass()), [new \Exception()]), false];

$t = new class() extends \Exception implements StopWorkerExceptionInterface {};
$t = new class extends \Exception implements StopWorkerExceptionInterface {};
yield 'it should stop with custom exception' => [$t, true];
yield 'it should stop with core exception' => [new StopWorkerException(), true];

2 changes: 1 addition & 1 deletion Tests/Exception/HandlerFailedExceptionTest.php
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ class HandlerFailedExceptionTest extends TestCase
public function testThatStringErrorCodeConvertsToInteger()
{
$envelope = new Envelope(new \stdClass());
$exception = new class() extends \RuntimeException {
$exception = new class extends \RuntimeException {
public function __construct()
{
$this->code = 'HY000';
19 changes: 19 additions & 0 deletions Tests/Fixtures/DummyMessageInterfaceWithAttribute1.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Fixtures;

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage(transport: ['first_sender', 'third_sender'])]
interface DummyMessageInterfaceWithAttribute1
{
}
19 changes: 19 additions & 0 deletions Tests/Fixtures/DummyMessageInterfaceWithAttribute2.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Fixtures;

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage(transport: 'second_sender')]
interface DummyMessageInterfaceWithAttribute2
{
}
19 changes: 19 additions & 0 deletions Tests/Fixtures/DummyMessageWithAttribute.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

namespace Symfony\Component\Messenger\Tests\Fixtures;

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage(transport: ['first_sender', 'second_sender'])]
class DummyMessageWithAttribute implements DummyMessageInterface
{
public function __construct(
private string $message,
) {
}

public function getMessage(): string
{
return $this->message;
}
}
25 changes: 25 additions & 0 deletions Tests/Fixtures/DummyMessageWithInterfaceWithAttribute.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Fixtures;

class DummyMessageWithInterfaceWithAttribute implements DummyMessageInterfaceWithAttribute1, DummyMessageInterfaceWithAttribute2
{
public function __construct(
private string $message,
) {
}

public function getMessage(): string
{
return $this->message;
}
}
19 changes: 19 additions & 0 deletions Tests/Fixtures/DummyMessageWithParentWithAttribute.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Fixtures;

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage(transport: 'third_sender')]
class DummyMessageWithParentWithAttribute extends DummyMessageWithAttribute
{
}
2 changes: 1 addition & 1 deletion Tests/Handler/HandleDescriptorTest.php
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ public static function provideHandlers(): iterable
yield [\Closure::fromCallable(function () {}), 'Closure'];
yield [\Closure::fromCallable(new DummyCommandHandler()), DummyCommandHandler::class.'::__invoke'];
yield [\Closure::bind(\Closure::fromCallable(function () {}), new \stdClass()), 'Closure'];
yield [new class() {
yield [new class {
public function __invoke()
{
}
19 changes: 10 additions & 9 deletions Tests/Middleware/DispatchAfterCurrentBusMiddlewareTest.php
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@

use PHPUnit\Framework\AssertionFailedError;
use PHPUnit\Framework\Constraint\Callback;
use PHPUnit\Framework\MockObject\Stub\ReturnCallback;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
@@ -106,14 +105,15 @@ public function testThrowingEventsHandlingWontStopExecution()
$secondEvent,
];

$matcher = $this->exactly(3);
$handlingMiddleware->expects($matcher)
$handlingMiddleware->expects($this->exactly(3))
->method('handle')
->with($this->callback(function (Envelope $envelope) use (&$series) {
return $envelope->getMessage() === array_shift($series);
}))
->willReturnCallback(function ($envelope, StackInterface $stack) use ($matcher) {
if (2 === $matcher->getInvocationCount()) {
->willReturnCallback(function ($envelope, StackInterface $stack) {
static $call = 0;

if (2 === ++$call) {
throw new \RuntimeException('Some exception while handling first event');
}

@@ -175,14 +175,15 @@ public function testLongChainWithExceptions()
// Note: $eventL3a should not be handled.
];

$matcher = $this->exactly(7);
$handlingMiddleware->expects($matcher)
$handlingMiddleware->expects($this->exactly(7))
->method('handle')
->with($this->callback(function (Envelope $envelope) use (&$series) {
return $envelope->getMessage() === array_shift($series);
}))
->willReturnCallback(function ($envelope, StackInterface $stack) use ($eventBus, $eventL2a, $eventL2b, $eventL3a, $eventL3b, $matcher) {
switch ($matcher->getInvocationCount()) {
->willReturnCallback(function ($envelope, StackInterface $stack) use ($eventBus, $eventL2a, $eventL2b, $eventL3a, $eventL3b) {
static $call = 0;

switch (++$call) {
case 1:
case 2:
case 4:
14 changes: 7 additions & 7 deletions Tests/Middleware/HandleMessageMiddlewareTest.php
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ public function testItKeysTheHandlerFailedNestedExceptionsByHandlerDescription()
{
$message = new DummyMessage('Hey');
$envelope = new Envelope($message);
$handler = new class() {
$handler = new class {
public function __invoke()
{
throw new \Exception('failed');
@@ -100,23 +100,23 @@ public function testItAddsHandledStamps(array $handlers, array $expectedStamps,

public static function itAddsHandledStampsProvider(): iterable
{
$first = new class() extends HandleMessageMiddlewareTestCallable {
$first = new class extends HandleMessageMiddlewareTestCallable {
public function __invoke()
{
return 'first result';
}
};
$firstClass = $first::class;

$second = new class() extends HandleMessageMiddlewareTestCallable {
$second = new class extends HandleMessageMiddlewareTestCallable {
public function __invoke()
{
return null;
}
};
$secondClass = $second::class;

$failing = new class() extends HandleMessageMiddlewareTestCallable {
$failing = new class extends HandleMessageMiddlewareTestCallable {
public function __invoke()
{
throw new \Exception('handler failed.');
@@ -199,7 +199,7 @@ public function testAllowNoHandlers()

public function testBatchHandler()
{
$handler = new class() implements BatchHandlerInterface {
$handler = new class implements BatchHandlerInterface {
public array $processedMessages;

use BatchHandlerTrait;
@@ -255,7 +255,7 @@ private function process(array $jobs): void

public function testBatchHandlerNoAck()
{
$handler = new class() implements BatchHandlerInterface {
$handler = new class implements BatchHandlerInterface {
use BatchHandlerTrait;

public function __invoke(DummyMessage $message, ?Acknowledger $ack = null)
@@ -290,7 +290,7 @@ private function process(array $jobs): void

public function testBatchHandlerNoBatch()
{
$handler = new class() implements BatchHandlerInterface {
$handler = new class implements BatchHandlerInterface {
public array $processedMessages;

use BatchHandlerTrait;
2 changes: 1 addition & 1 deletion Tests/Middleware/SendMessageMiddlewareTest.php
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ public function testItSendsTheMessageToAssignedSender()
/* @var SentStamp $stamp */
$this->assertInstanceOf(SentStamp::class, $stamp = $envelope->last(SentStamp::class), 'it adds a sent stamp');
$this->assertSame('my_sender', $stamp->getSenderAlias());
$this->assertStringMatchesFormat('Mock_SenderInterface_%s', $stamp->getSenderClass());
$this->assertSame($sender::class, $stamp->getSenderClass());
}

public function testItSendsTheMessageToMultipleSenders()
2 changes: 1 addition & 1 deletion Tests/Middleware/TraceableMiddlewareTest.php
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ public function testHandle()
$busId = 'command_bus';
$envelope = new Envelope(new DummyMessage('Hello'));

$middleware = new class() implements MiddlewareInterface {
$middleware = new class implements MiddlewareInterface {
public int $calls = 0;

public function handle(Envelope $envelope, StackInterface $stack): Envelope
2 changes: 1 addition & 1 deletion Tests/Stamp/TransportNamesStampTest.php
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ public function testGetSenders()
$configuredSenders = ['first_transport', 'second_transport', 'other_transport'];
$stamp = new TransportNamesStamp($configuredSenders);
$stampSenders = $stamp->getTransportNames();
$this->assertEquals(\count($configuredSenders), \count($stampSenders));
$this->assertSameSize($configuredSenders, $stampSenders);

foreach ($configuredSenders as $key => $sender) {
$this->assertSame($sender, $stampSenders[$key]);
59 changes: 59 additions & 0 deletions Tests/Transport/Sender/SendersLocatorTest.php
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\TransportNamesStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageWithAttribute;
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
@@ -53,6 +55,63 @@ public function testItReturnsTheSenderBasedOnTransportNamesStamp()
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
}

/**
* @testWith ["\\Symfony\\Component\\Messenger\\Tests\\Fixtures\\DummyMessageWithAttribute", ["first_sender", "second_sender"]]
* ["\\Symfony\\Component\\Messenger\\Tests\\Fixtures\\DummyMessageWithParentWithAttribute", ["third_sender", "first_sender", "second_sender"]]
* ["\\Symfony\\Component\\Messenger\\Tests\\Fixtures\\DummyMessageWithInterfaceWithAttribute", ["first_sender", "third_sender", "second_sender"]]
*/
public function testItReturnsTheSenderBasedOnAsMessageAttribute(string $messageClass, array $expectedSenders)
{
$firstSender = $this->createMock(SenderInterface::class);
$secondSender = $this->createMock(SenderInterface::class);
$thirdSender = $this->createMock(SenderInterface::class);
$otherSender = $this->createMock(SenderInterface::class);
$sendersLocator = $this->createContainer([
'first_sender' => $firstSender,
'second_sender' => $secondSender,
'third_sender' => $thirdSender,
'other_sender' => $otherSender,
]);
$locator = new SendersLocator([], $sendersLocator);

$this->assertSame($expectedSenders, array_keys(iterator_to_array($locator->getSenders(new Envelope(new $messageClass('a'))))));
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
}

public function testAsMessageAttributeIsOverridenByTransportNamesStamp()
{
$firstSender = $this->createMock(SenderInterface::class);
$secondSender = $this->createMock(SenderInterface::class);
$otherSender = $this->createMock(SenderInterface::class);
$sendersLocator = $this->createContainer([
'first_sender' => $firstSender,
'second_sender' => $secondSender,
'other_sender' => $otherSender,
]);
$locator = new SendersLocator([], $sendersLocator);

$this->assertSame(['other_sender' => $otherSender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessageWithAttribute('a'), [new TransportNamesStamp(['other_sender'])]))));
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
}

public function testAsMessageAttributeIsOverridenByUserConfiguration()
{
$firstSender = $this->createMock(SenderInterface::class);
$secondSender = $this->createMock(SenderInterface::class);
$otherSender = $this->createMock(SenderInterface::class);
$sendersLocator = $this->createContainer([
'first_sender' => $firstSender,
'second_sender' => $secondSender,
'other_sender' => $otherSender,
]);
$locator = new SendersLocator([
DummyMessageInterface::class => ['other_sender'],
], $sendersLocator);

$this->assertSame(['other_sender' => $otherSender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessageWithAttribute('a')))));
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
}

public function testSendersMapWithFallback()
{
$firstSender = $this->createMock(SenderInterface::class);
1 change: 0 additions & 1 deletion Tests/Transport/Serialization/SerializerTest.php
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@
use Symfony\Component\Messenger\Stamp\ValidationStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
use Symfony\Component\Serializer\SerializerInterface as SerializerComponentInterface;

83 changes: 81 additions & 2 deletions Tests/WorkerTest.php
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\ResettableDummyReceiver;
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
@@ -386,7 +387,7 @@ public function testWorkerLimitQueues()
->method('get')
;

$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus = $this->createMock(MessageBusInterface::class);

$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
@@ -400,7 +401,7 @@ public function testWorkerLimitQueuesUnsupported()
$receiver1 = $this->createMock(QueueReceiverInterface::class);
$receiver2 = $this->createMock(ReceiverInterface::class);

$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus = $this->createMock(MessageBusInterface::class);

$worker = new Worker(['transport1' => $receiver1, 'transport2' => $receiver2], $bus, clock: new MockClock());
$this->expectException(RuntimeException::class);
@@ -603,6 +604,64 @@ public function testGcCollectCyclesIsCalledOnMessageHandle()

$this->assertGreaterThan(0, $gcStatus['runs']);
}

/**
* @requires extension pcntl
*/
public function testKeepalive()
{
ClockMock::withClockMock(false);

$expectedEnvelopes = [
new Envelope(new DummyMessage('Hey')),
new Envelope(new DummyMessage('Bob')),
];

$receiver = new DummyKeepaliveReceiver([
[$expectedEnvelopes[0]],
[$expectedEnvelopes[1]],
]);

$handler = new DummyBatchHandler(3);

$middleware = new HandleMessageMiddleware(new HandlersLocator([
DummyMessage::class => [new HandlerDescriptor($handler)],
]));

$bus = new MessageBus([$middleware]);

$dispatcher = new EventDispatcher();
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use ($receiver) {
static $i = 0;
if (1 < ++$i) {
$event->getWorker()->stop();
$this->assertSame(2, $receiver->getAcknowledgeCount());
} else {
$this->assertSame(0, $receiver->getAcknowledgeCount());
}
});

$worker = new Worker([$receiver], $bus, $dispatcher, clock: new MockClock());

try {
$oldAsync = pcntl_async_signals(true);
pcntl_signal(\SIGALRM, fn () => $worker->keepalive(2));
pcntl_alarm(2);

$worker->run();
} finally {
pcntl_async_signals($oldAsync);
pcntl_signal(\SIGALRM, \SIG_DFL);
}

$this->assertCount(2, $receiver->keepaliveEnvelopes);
$this->assertSame($expectedEnvelopes, $receiver->keepaliveEnvelopes);

$receiver->keepaliveEnvelopes = [];
$worker->keepalive(2);

$this->assertCount(0, $receiver->keepaliveEnvelopes);
}
}

class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface
@@ -613,12 +672,26 @@ public function getFromQueues(array $queueNames): iterable
}
}

class DummyKeepaliveReceiver extends DummyReceiver implements KeepaliveReceiverInterface
{
public array $keepaliveEnvelopes = [];

public function keepalive(Envelope $envelope, ?int $seconds = null): void
{
$this->keepaliveEnvelopes[] = $envelope;
}
}

class DummyBatchHandler implements BatchHandlerInterface
{
use BatchHandlerTrait;

public array $processedMessages;

public function __construct(private ?int $delay = null)
{
}

public function __invoke(DummyMessage $message, ?Acknowledger $ack = null)
{
return $this->handle($message, $ack);
@@ -633,6 +706,12 @@ private function process(array $jobs): void
{
$this->processedMessages = array_column($jobs, 0);

if (null !== $this->delay) {
for ($i = 0; $i < $this->delay; ++$i) {
sleep(1);
}
}

foreach ($jobs as [$job, $ack]) {
$ack->ack($job);
}
7 changes: 3 additions & 4 deletions TraceableMessageBus.php
Original file line number Diff line number Diff line change
@@ -16,12 +16,11 @@
*/
class TraceableMessageBus implements MessageBusInterface
{
private MessageBusInterface $decoratedBus;
private array $dispatchedMessages = [];

public function __construct(MessageBusInterface $decoratedBus)
{
$this->decoratedBus = $decoratedBus;
public function __construct(
private MessageBusInterface $decoratedBus,
) {
}

public function dispatch(object $message, array $stamps = []): Envelope
2 changes: 1 addition & 1 deletion Transport/InMemory/InMemoryTransport.php
Original file line number Diff line number Diff line change
@@ -102,7 +102,7 @@ public function send(Envelope $envelope): Envelope
/** @var DelayStamp|null $delayStamp */
if ($delayStamp = $envelope->last(DelayStamp::class)) {
$now = $this->clock?->now() ?? new \DateTimeImmutable();
$this->availableAt[$id] = $now->modify(sprintf('+%d seconds', $delayStamp->getDelay() / 1000));
$this->availableAt[$id] = $now->modify(\sprintf('+%d seconds', $delayStamp->getDelay() / 1000));
}

return $envelope;
27 changes: 27 additions & 0 deletions Transport/Receiver/KeepaliveReceiverInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Receiver;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;

interface KeepaliveReceiverInterface extends ReceiverInterface
{
/**
* Informs the transport that the message is still being processed to avoid a timeout on the transport's side.
*
* @param int|null $seconds The minimum duration the message should be kept alive
*
* @throws TransportException If there is an issue communicating with the transport
*/
public function keepalive(Envelope $envelope, ?int $seconds = null): void;
}
34 changes: 33 additions & 1 deletion Transport/Sender/SendersLocator.php
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Sender;

use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Attribute\AsMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Handler\HandlersLocator;
@@ -45,6 +46,7 @@ public function getSenders(Envelope $envelope): iterable
}

$seen = [];
$found = false;

foreach (HandlersLocator::listTypes($envelope) as $type) {
if (str_ends_with($type, '*') && $seen) {
@@ -58,15 +60,45 @@ public function getSenders(Envelope $envelope): iterable
$seen[] = $senderAlias;

yield from $this->getSenderFromAlias($senderAlias);
$found = true;
}
}
}

// Let the configuration-driven map upper override message attributes,
// this allows environment-specific configuration overriding hardcoded
// transport name.
if ($found) {
return;
}

foreach ($this->getTransportNamesFromAttribute($envelope) as $senderAlias) {
yield from $this->getSenderFromAlias($senderAlias);
}
}

private function getTransportNamesFromAttribute(Envelope $envelope): array
{
$transports = [];
$messageClass = $envelope->getMessage()::class;

foreach ([$messageClass] + class_parents($messageClass) + class_implements($messageClass) as $class) {
foreach ((new \ReflectionClass($class))->getAttributes(AsMessage::class, \ReflectionAttribute::IS_INSTANCEOF) as $refAttr) {
$asMessage = $refAttr->newInstance();

if ($asMessage->transport) {
$transports = array_merge($transports, (array) $asMessage->transport);
}
}
}

return $transports;
}

private function getSenderFromAlias(string $senderAlias): iterable
{
if (!$this->sendersLocator->has($senderAlias)) {
throw new RuntimeException(sprintf('Invalid senders configuration: sender "%s" is not in the senders locator.', $senderAlias));
throw new RuntimeException(\sprintf('Invalid senders configuration: sender "%s" is not in the senders locator.', $senderAlias));
}

yield $senderAlias => $this->sendersLocator->get($senderAlias);
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ final class FlattenExceptionNormalizer implements DenormalizerInterface, Normali

public function normalize(mixed $object, ?string $format = null, array $context = []): array
{
$normalized = [
return [
'message' => $object->getMessage(),
'code' => $object->getCode(),
'headers' => $object->getHeaders(),
@@ -41,8 +41,6 @@ public function normalize(mixed $object, ?string $format = null, array $context
'trace' => $object->getTrace(),
'trace_as_string' => $object->getTraceAsString(),
];

return $normalized;
}

public function getSupportedTypes(?string $format): array
2 changes: 1 addition & 1 deletion Transport/Serialization/PhpSerializer.php
Original file line number Diff line number Diff line change
@@ -118,6 +118,6 @@ private function safelyUnserialize(string $contents): Envelope
*/
public static function handleUnserializeCallback(string $class): never
{
throw new MessageDecodingFailedException(sprintf('Message class "%s" not found during decoding.', $class));
throw new MessageDecodingFailedException(\sprintf('Message class "%s" not found during decoding.', $class));
}
}
4 changes: 2 additions & 2 deletions Transport/Serialization/Serializer.php
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ public function __construct(
public static function create(): self
{
if (!class_exists(SymfonySerializer::class)) {
throw new LogicException(sprintf('The "%s" class requires Symfony\'s Serializer component. Try running "composer require symfony/serializer" or use "%s" instead.', __CLASS__, PhpSerializer::class));
throw new LogicException(\sprintf('The "%s" class requires Symfony\'s Serializer component. Try running "composer require symfony/serializer" or use "%s" instead.', __CLASS__, PhpSerializer::class));
}

$encoders = [new XmlEncoder(), new JsonEncoder()];
@@ -173,7 +173,7 @@ private function getMimeTypeForFormat(): ?string
'json' => 'application/json',
'xml' => 'application/xml',
'yml',
'yaml' => 'application/x-yaml',
'yaml' => 'application/yaml',
'csv' => 'text/csv',
default => null,
};
30 changes: 30 additions & 0 deletions Worker.php
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\RateLimiter\LimiterInterface;
@@ -47,6 +48,10 @@ class Worker
private WorkerMetadata $metadata;
private array $acks = [];
private \SplObjectStorage $unacks;
/**
* @var \SplObjectStorage<object, array{0: string, 1: Envelope}>
*/
private \SplObjectStorage $keepalives;

/**
* @param ReceiverInterface[] $receivers Where the key is the transport name
@@ -63,6 +68,7 @@ public function __construct(
'transportNames' => array_keys($receivers),
]);
$this->unacks = new \SplObjectStorage();
$this->keepalives = new \SplObjectStorage();
}

/**
@@ -105,6 +111,10 @@ public function run(array $options = []): void
foreach ($envelopes as $envelope) {
$envelopeHandled = true;

if ($receiver instanceof KeepaliveReceiverInterface) {
$this->keepalives[$envelope->getMessage()] = [$transportName, $envelope];
}

$this->rateLimit($transportName);
$this->handleMessage($envelope, $transportName);
$this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false));
@@ -186,6 +196,7 @@ private function ack(): bool
if ($rejectFirst = $e instanceof RejectRedeliveredMessageException) {
// redelivered messages are rejected first so that continuous failures in an event listener or while
// publishing for retry does not cause infinite redelivery loops
unset($this->keepalives[$envelope->getMessage()]);
$receiver->reject($envelope);
}

@@ -199,6 +210,7 @@ private function ack(): bool
$envelope = $failedEvent->getEnvelope();

if (!$rejectFirst) {
unset($this->keepalives[$envelope->getMessage()]);
$receiver->reject($envelope);
}

@@ -218,6 +230,7 @@ private function ack(): bool
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
}

unset($this->keepalives[$envelope->getMessage()]);
$receiver->ack($envelope);
}

@@ -278,6 +291,23 @@ public function stop(): void
$this->shouldStop = true;
}

public function keepalive(?int $seconds): void
{
foreach ($this->keepalives as $message) {
[$transportName, $envelope] = $this->keepalives[$message];

if (!$this->receivers[$transportName] instanceof KeepaliveReceiverInterface) {
throw new RuntimeException(\sprintf('Receiver for "%s" does not implement "%s".', $transportName, KeepaliveReceiverInterface::class));
}

$this->logger?->info('Sending keepalive request.', [
'transport' => $transportName,
'message_id' => $envelope->last(TransportMessageIdStamp::class)?->getId(),
]);
$this->receivers[$transportName]->keepalive($envelope, $seconds);
}
}

public function getMetadata(): WorkerMetadata
{
return $this->metadata;
8 changes: 3 additions & 5 deletions WorkerMetadata.php
Original file line number Diff line number Diff line change
@@ -16,11 +16,9 @@
*/
final class WorkerMetadata
{
private array $metadata;

public function __construct(array $metadata)
{
$this->metadata = $metadata;
public function __construct(
private array $metadata,
) {
}

public function set(array $newMetadata): void
7 changes: 4 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
@@ -18,11 +18,12 @@
"require": {
"php": ">=8.2",
"psr/log": "^1|^2|^3",
"symfony/clock": "^6.4|^7.0"
"symfony/clock": "^6.4|^7.0",
"symfony/deprecation-contracts": "^2.5|^3"
},
"require-dev": {
"psr/cache": "^1.0|^2.0|^3.0",
"symfony/console": "^6.4|^7.0",
"symfony/console": "^7.2",
"symfony/dependency-injection": "^6.4|^7.0",
"symfony/event-dispatcher": "^6.4|^7.0",
"symfony/http-kernel": "^6.4|^7.0",
@@ -36,7 +37,7 @@
"symfony/validator": "^6.4|^7.0"
},
"conflict": {
"symfony/console": "<6.4",
"symfony/console": "<7.2",
"symfony/event-dispatcher": "<6.4",
"symfony/event-dispatcher-contracts": "<2.5",
"symfony/framework-bundle": "<6.4",