Skip to content

Commit

Permalink
Implement Filesystem transport
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Jan 18, 2017
0 parents commit 69ee1f1
Show file tree
Hide file tree
Showing 27 changed files with 2,875 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*~
/composer.lock
/composer.phar
/phpunit.xml
/vendor/
/.idea/
21 changes: 21 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
sudo: false

git:
depth: 1

language: php

php:
- '5.6'
- '7.0'

cache:
directories:
- $HOME/.composer/cache

install:
- composer self-update
- composer install --prefer-source

script:
- vendor/bin/phpunit --exclude-group=functional
179 changes: 179 additions & 0 deletions Client/FsDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
<?php

namespace Enqueue\Fs\Client;

use Enqueue\Client\Config;
use Enqueue\Client\DriverInterface;
use Enqueue\Client\Message;
use Enqueue\Client\MessagePriority;
use Enqueue\Client\Meta\QueueMetaRegistry;
use Enqueue\Fs\FsContext;
use Enqueue\Fs\FsDestination;
use Enqueue\Fs\FsMessage;
use Enqueue\Psr\Message as TransportMessage;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class FsDriver implements DriverInterface
{
/**
* @var FsContext
*/
private $context;

/**
* @var Config
*/
private $config;

/**
* @var QueueMetaRegistry
*/
private $queueMetaRegistry;

/**
* @param FsContext $context
* @param Config $config
* @param QueueMetaRegistry $queueMetaRegistry
*/
public function __construct(FsContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
{
$this->context = $context;
$this->config = $config;
$this->queueMetaRegistry = $queueMetaRegistry;
}

/**
* {@inheritdoc}
*/
public function sendToRouter(Message $message)
{
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
throw new \LogicException('Topic name parameter is required but is not set');
}

$topic = $this->createRouterTopic();
$transportMessage = $this->createTransportMessage($message);

$this->context->createProducer()->send($topic, $transportMessage);
}

/**
* {@inheritdoc}
*/
public function sendToProcessor(Message $message)
{
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
throw new \LogicException('Processor name parameter is required but is not set');
}

if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
throw new \LogicException('Queue name parameter is required but is not set');
}

$transportMessage = $this->createTransportMessage($message);
$destination = $this->createQueue($queueName);

$this->context->createProducer()->send($destination, $transportMessage);
}

/**
* {@inheritdoc}
*/
public function setupBroker(LoggerInterface $logger = null)
{
$logger = $logger ?: new NullLogger();
$log = function ($text, ...$args) use ($logger) {
$logger->debug(sprintf('[FsDriver] '.$text, ...$args));
};

// setup router
$routerTopic = $this->createRouterTopic();
$routerQueue = $this->createQueue($this->config->getRouterQueueName());

$log('Declare router exchange "%s" file: %s', $routerTopic->getTopicName(), $routerTopic->getFileInfo());
$this->context->declareDestination($routerTopic);

$log('Declare router queue "%s" file: %s', $routerQueue->getQueueName(), $routerTopic->getFileInfo());
$this->context->declareDestination($routerQueue);

// setup queues
foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) {
$queue = $this->createQueue($meta->getClientName());

$log('Declare processor queue "%s" file: %s', $queue->getQueueName(), $queue->getFileInfo());
$this->context->declareDestination($queue);
}
}

/**
* {@inheritdoc}
*
* @return FsDestination
*/
public function createQueue($queueName)
{
return $this->context->createQueue($this->config->createTransportQueueName($queueName));
}

/**
* {@inheritdoc}
*
* @return FsMessage
*/
public function createTransportMessage(Message $message)
{
$properties = $message->getProperties();

$headers = $message->getHeaders();
$headers['content_type'] = $message->getContentType();

$transportMessage = $this->context->createMessage();
$transportMessage->setBody($message->getBody());
$transportMessage->setHeaders($headers);
$transportMessage->setProperties($properties);
$transportMessage->setMessageId($message->getMessageId());
$transportMessage->setTimestamp($message->getTimestamp());

return $transportMessage;
}

/**
* @param FsMessage $message
*
* {@inheritdoc}
*/
public function createClientMessage(TransportMessage $message)
{
$clientMessage = new Message();

$clientMessage->setBody($message->getBody());
$clientMessage->setHeaders($message->getHeaders());
$clientMessage->setProperties($message->getProperties());

$clientMessage->setContentType($message->getHeader('content_type'));
$clientMessage->setMessageId($message->getMessageId());
$clientMessage->setTimestamp($message->getTimestamp());
$clientMessage->setPriority(MessagePriority::NORMAL);

return $clientMessage;
}

/**
* @return Config
*/
public function getConfig()
{
return $this->config;
}

/**
* @return FsDestination
*/
private function createRouterTopic()
{
return $this->context->createTopic(
$this->config->createTransportRouterTopicName($this->config->getRouterTopicName())
);
}
}
35 changes: 35 additions & 0 deletions FsConnectionFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace Enqueue\Fs;

use Enqueue\Psr\ConnectionFactory;

class FsConnectionFactory implements ConnectionFactory
{
/**
* @var string
*/
private $config;

/**
* @param array $config
*/
public function __construct(array $config)
{
$this->config = array_replace([
'store_dir' => null,
'pre_fetch_count' => 1,
'chmod' => 0600,
], $config);
}

/**
* {@inheritdoc}
*
* @return FsContext
*/
public function createContext()
{
return new FsContext($this->config['store_dir'], $this->config['pre_fetch_count'], $this->config['chmod']);
}
}
Loading

0 comments on commit 69ee1f1

Please sign in to comment.