Skip to content

Commit 2924e40

Browse files
author
Bartosz Kubicki
committed
Init repository
0 parents  commit 2924e40

23 files changed

+1441
-0
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
.idea
2+
build/
3+
vendor/
4+
coverage/
5+
.env
6+
docker-compose.yml
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* File: RetryLimitOverflowResolverInterface.php
7+
*
8+
* @author Bartosz Kubicki bartosz.kubicki@lizardmedia.pl>
9+
* @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
10+
*/
11+
12+
namespace LizardMedia\MessageQueue\Api\Envelope;
13+
14+
use Magento\Framework\MessageQueue\EnvelopeInterface;
15+
use Magento\Framework\MessageQueue\QueueInterface;
16+
17+
/**
18+
* Interface RetryLimitOverflowResolverInterface
19+
* @package LizardMedia\MessageQueue\Api\Envelope
20+
*/
21+
interface RetryLimitOverflowResolverInterface
22+
{
23+
/**
24+
* @param EnvelopeInterface $envelope
25+
* @param string $queueName
26+
* @return bool
27+
*/
28+
public function isLimitReached(EnvelopeInterface $envelope, string $queueName): bool;
29+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* File: EnvelopeCallbackFactoryInterface.php
7+
*
8+
* @author Bartosz Kubicki bartosz.kubicki@lizardmedia.pl>
9+
* @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
10+
*/
11+
12+
namespace LizardMedia\MessageQueue\Api\Queue\Consumer;
13+
14+
use InvalidArgumentException;
15+
use LizardMedia\MessageQueue\Queue\Consumer\EnvelopeCallback\EnvelopeCallbackInterface;
16+
use Magento\Framework\MessageQueue\ConsumerConfigurationInterface as UsedConsumerConfig;
17+
18+
/**
19+
* Interface EnvelopeCallbackFactoryInterface
20+
* @package LizardMedia\MessageQueue\Api\Queue\Consumer
21+
*/
22+
interface EnvelopeCallbackFactoryInterface
23+
{
24+
/**
25+
* @param string $type
26+
* @param UsedConsumerConfig $usedConsumerConfiguration
27+
* @return EnvelopeCallbackInterface
28+
* @throws InvalidArgumentException
29+
*/
30+
public function create(string $type, UsedConsumerConfig $usedConsumerConfiguration): EnvelopeCallbackInterface;
31+
}

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
### 1.0.0 ###
2+
* custom implementation of `Magento\Framework\MessageQueue\ConsumerInterface` making possible injection of envelope callback,
3+
which allows to introduce custom message consumption easily without copy-paste of whole class
4+
* a few implementations of `LizardMedia\MessageQueue\Queue\Consumer\EnvelopeCallback\EnvelopeCallbackInterface`, each handling
5+
message in its specific way, including `x-death` parameters support
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* File: PutPoisonPillCommand.php
7+
*
8+
* @author Bartosz Kubicki bartosz.kubicki@lizardmedia.pl>
9+
* @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
10+
*/
11+
12+
namespace LizardMedia\MessageQueue\Console\Command;
13+
14+
use Exception;
15+
use Magento\Framework\Console\Cli;
16+
use Magento\Framework\MessageQueue\PoisonPill\PoisonPillPutInterface;
17+
use Symfony\Component\Console\Command\Command;
18+
use Symfony\Component\Console\Input\InputInterface;
19+
use Symfony\Component\Console\Output\OutputInterface;
20+
21+
/**
22+
* Class PutPoisonPillCommand
23+
* @package LizardMedia\MessageQueue\Console\Command
24+
* @codeCoverageIgnore
25+
*/
26+
class PutPoisonPillCommand extends Command
27+
{
28+
/**
29+
* @var PoisonPillPutInterface
30+
*/
31+
private $poisonPillPut;
32+
33+
/**
34+
* PutPoisonPill constructor.
35+
* @param PoisonPillPutInterface $poisonPillPut
36+
* @param string|null $name
37+
*/
38+
public function __construct(PoisonPillPutInterface $poisonPillPut, string $name = null)
39+
{
40+
parent::__construct($name);
41+
$this->poisonPillPut = $poisonPillPut;
42+
}
43+
44+
/**
45+
* @return void
46+
*/
47+
protected function configure(): void
48+
{
49+
$this->setName('lm:queue:consumers:poison')
50+
->setDescription('Poison queue consumers');
51+
}
52+
53+
/**
54+
* @param InputInterface $input
55+
* @param OutputInterface $output
56+
* @return int
57+
* @throws Exception
58+
*/
59+
public function execute(InputInterface $input, OutputInterface $output): int
60+
{
61+
$poisonPillVersion = $this->poisonPillPut->put();
62+
$output->writeln('Queue consumers have been poisoned...');
63+
$output->writeln(sprintf('New Poison Pill Version: %s', $poisonPillVersion));
64+
return Cli::RETURN_SUCCESS;
65+
}
66+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* File: RetryLimitOverflowResolver.php
7+
*
8+
* @author Bartosz Kubicki bartosz.kubicki@lizardmedia.pl>
9+
* @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
10+
*/
11+
12+
namespace LizardMedia\MessageQueue\Envelope;
13+
14+
use Magento\Framework\MessageQueue\QueueInterface;
15+
use function array_key_exists;
16+
use LizardMedia\MessageQueue\Api\Envelope\RetryLimitOverflowResolverInterface;
17+
use Magento\Framework\MessageQueue\EnvelopeInterface;
18+
use PhpAmqpLib\Wire\AMQPTable;
19+
20+
/**
21+
* Class RetryLimitOverflowResolver
22+
* @package LizardMedia\MessageQueue\Envelope
23+
*/
24+
class RetryLimitOverflowResolver implements RetryLimitOverflowResolverInterface
25+
{
26+
/**
27+
* @var string
28+
*/
29+
private const APPLICATION_HEADERS_KEY = 'application_headers';
30+
31+
/**
32+
* @var string
33+
*/
34+
private const X_DEATH_KEY = 'x-death';
35+
36+
/**
37+
* @var string
38+
*/
39+
private const QUEUE_KEY = 'queue';
40+
41+
/**
42+
* @var string
43+
*/
44+
private const X_DEATH_COUNT_KEY = 'count';
45+
46+
/**
47+
* @var int
48+
*/
49+
private $limit;
50+
51+
/**
52+
* RetryLimitOverflowResolver constructor.
53+
* @param int $limit
54+
*/
55+
public function __construct(int $limit = 3)
56+
{
57+
$this->limit = $limit;
58+
}
59+
60+
/**
61+
* @param EnvelopeInterface $envelope
62+
* @param string $queueName
63+
* @return bool
64+
*/
65+
public function isLimitReached(EnvelopeInterface $envelope, string $queueName): bool
66+
{
67+
$xdeathParams = $this->getXDeathParameters($envelope, $queueName);
68+
return !empty($xdeathParams[self::X_DEATH_COUNT_KEY])
69+
&& (int) $xdeathParams[self::X_DEATH_COUNT_KEY] > $this->limit;
70+
}
71+
72+
/**
73+
* @param EnvelopeInterface $envelope
74+
* @param string $queueName
75+
* @return array
76+
*/
77+
private function getXDeathParameters(EnvelopeInterface $envelope, string $queueName): array
78+
{
79+
$properties = $envelope->getProperties();
80+
if (!array_key_exists(self::APPLICATION_HEADERS_KEY, $properties)) {
81+
return [];
82+
}
83+
84+
/** @var $applicationHeaders AMQPTable */
85+
$applicationHeaders = $properties[self::APPLICATION_HEADERS_KEY];
86+
if ($applicationHeaders === null) {
87+
return [];
88+
}
89+
90+
$data = $applicationHeaders->getNativeData();
91+
92+
if (empty($data[self::X_DEATH_KEY])) {
93+
return [];
94+
}
95+
96+
$xdeathParamsArray = $data[self::X_DEATH_KEY];
97+
98+
foreach ($xdeathParamsArray as $key => $queueXdeathParams) {
99+
if (isset($queueXdeathParams[self::QUEUE_KEY]) && $queueXdeathParams[self::QUEUE_KEY] === $queueName) {
100+
return $xdeathParamsArray[$key];
101+
}
102+
}
103+
104+
return [];
105+
}
106+
}

LICENSE.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2020 Lizard Media | UX & software house
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* File: ConsumerWithInjectableEnvelopeCallback.php
7+
*
8+
* @author Bartosz Kubicki bartosz.kubicki@lizardmedia.pl>
9+
* @copyright Copyright (C) 2020 Lizard Media (http://lizardmedia.pl)
10+
*/
11+
12+
namespace LizardMedia\MessageQueue\Queue\Consumer;
13+
14+
use Closure;
15+
use LizardMedia\MessageQueue\Api\Queue\Consumer\EnvelopeCallbackFactoryInterface;
16+
use Magento\Framework\MessageQueue\CallbackInvokerInterface;
17+
use Magento\Framework\MessageQueue\ConsumerConfigurationInterface as UsedConsumerConfig;
18+
use Magento\Framework\MessageQueue\ConsumerInterface;
19+
use Magento\Framework\MessageQueue\EnvelopeInterface;
20+
21+
/**
22+
* Class ConsumerWithInjectableEnvelopeCallback
23+
* @package LizardMedia\MessageQueue\Queue\Consumer
24+
* @SuppressWarnings(PHPMD.LongVariable)
25+
* @codeCoverageIgnore
26+
*/
27+
class ConsumerWithInjectableEnvelopeCallback implements ConsumerInterface
28+
{
29+
/**
30+
* @var string
31+
*/
32+
private $envelopeCallbackType;
33+
34+
/**
35+
* @var EnvelopeCallbackFactoryInterface
36+
*/
37+
private $envelopeCallbackFactory;
38+
39+
/**
40+
* @var CallbackInvokerInterface
41+
*/
42+
private $invoker;
43+
44+
/**
45+
* @var UsedConsumerConfig
46+
*/
47+
private $usedConsumerConfig;
48+
49+
/**
50+
* @param EnvelopeCallbackFactoryInterface $envelopeCallbackFactory
51+
* @param CallbackInvokerInterface $invoker
52+
* @param UsedConsumerConfig $configuration
53+
* @param string $envelopeCallbackType
54+
*/
55+
public function __construct(
56+
EnvelopeCallbackFactoryInterface $envelopeCallbackFactory,
57+
CallbackInvokerInterface $invoker,
58+
UsedConsumerConfig $configuration,
59+
string $envelopeCallbackType
60+
) {
61+
$this->envelopeCallbackType = $envelopeCallbackType;
62+
$this->envelopeCallbackFactory = $envelopeCallbackFactory;
63+
$this->invoker = $invoker;
64+
$this->usedConsumerConfig = $configuration;
65+
}
66+
67+
/**
68+
* @param null $maxNumberOfMessages
69+
* @return void
70+
*/
71+
public function process($maxNumberOfMessages = null): void
72+
{
73+
$queue = $this->usedConsumerConfig->getQueue();
74+
75+
if (!isset($maxNumberOfMessages)) {
76+
$queue->subscribe($this->getTransactionCallback($this->usedConsumerConfig));
77+
} else {
78+
$this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($this->usedConsumerConfig));
79+
}
80+
}
81+
82+
/**
83+
* @param UsedConsumerConfig $usedConsumerConfig
84+
* @return Closure
85+
*/
86+
protected function getTransactionCallback(UsedConsumerConfig $usedConsumerConfig): Closure
87+
{
88+
$callbackInstance = $this->envelopeCallbackFactory->create($this->envelopeCallbackType, $usedConsumerConfig);
89+
90+
return static function (EnvelopeInterface $message) use ($callbackInstance) {
91+
$callbackInstance->execute($message);
92+
};
93+
}
94+
}

0 commit comments

Comments
 (0)