Application for simplifying using message queues
Library needs 2 configuration options to by specified by you. How to specify them depends on your service container (see Acessing to the services section).
- imatic_notification_params
- parameters of the connection to the broker
<?php
$params = [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'namespace' => '',
];- imatic_notification.logger
- logger implementing interface of psr log
- in our examples below, we did use NullLogger, which will log nothing - we highly discourage from usage if this logger in production since you'll have no Idea what went wrong if something wrong happend
You will work with 2 interfaces Publisher for publishing messages into queues and Consumer to consume messages published by publisher.
<?php
namespace Imatic\Notification;
interface Connection
{
public function createPublisher(ChannelParams $params);
public function createConsumer(ChannelParams $params);
}
interface Publisher
{
public function publish(Message $message, $key = '');
}
interface Consumer
{
public function consume($queueName, $key, callable $callback);
public function wait();
public function waitN($n);
}These 2 interfaces are implemented by service imatic_notification.connection
It is advised to use one container to create connection object for you from definition file "config/services.yml". Below you can see 2 of many possible ways to go.
To load services for Symfony, you need to edit your your app/config/config.yml
imports:
- { resource: ../../vendor/imatic/notification/config/services.yml }
parameters:
imatic_notification_params: []
services:
imatic_notification.logger:
class: Psr\Log\NullLoggerAnd then you can access to the services from your Symfony container
<?php
$connection = $this->container->get('imatic_notification.connection');<?php
// create connection to the broker
$connection = $this->container->get('imatic_notification.connection');
// create channel parameters
$channelParams = new ChannelParams($exchange = 'imatic_queue_test');
// create consumer
$consumer = $connection->createConsumer($channelParams);
// listen to the messages on queue "queue_name"
// to all messages having routing key "routing_key"
$consumer->consume('queue_name', 'routing_key', function (Message $msg) {
$this->logger->logData('data');
// you need to return true to tell the broker that it can discard the messaga
// because you successfully processed it
return true;
});
// create publisher
$publisher = $connection->createPublisher($channelParams);
// publish message to the channel with routing key "routing_key"
$publisher->publish(new Message(['data' => 'bdy']), 'routing_key');
// consume only 1 message, then continue
$consumer->waitN(1);
// won't return till you have listening consumers
$consumer->wait();