RabbitMQ是实现了高级消息队列协议的开源消息代理软件。最近使用RabbitMQ做消息队列,留下Demo。
先在服务器上安装RabbitMQ,并确认php安装了对应版本的amqp拓展。
demo如下:
queue.php
/**
* 消息队列demo
*/
class Queue{
/* 连接 */
private static $connection;
private static $connect;
/* 频道 */
private static $channel = [];
/* 交换机 */
private static $exchange = [];
/* 队列 */
private static $queue = [];
private static $instance = null;
private static $queueConfig = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
private function __construct()
{
$queueConfig = self::$queueConfig;
if(class_exists('AMQPConnection')){
self::$connection = new \AMQPConnection($queueConfig);
self::$connect = self::$connection->connect();
date_default_timezone_set("Asia/Shanghai");
}else{
echo '没有安装AMQP拓展';
}
}
/**
* 创建单例
* @return [object] Queue
*/
public static function instance()
{
if(isset(self::$instance) && self::$instance instanceof self){
return self::$instance;
}
self::$instance = new self();
return self::$instance;
}
private function __clone(){}
public function __set($name, $value)
{
$this->$name = $value;
return $this;
}
public function __get($name)
{
return isset($this->$name) ? $this->$name : null;
}
/**
* 获取交换机实例
* @param [string] $exName 交换机名称
* @param [string] $channel 频道名称
* @return [object] 交换机实例
*/
public function getExchange($exName, $channel)
{
$index = $exName . '_' . $channel;
if(isset(self::$exchange[$index]) && self::$exchange[$index]){
return self::$exchange[$index];
}
$channel = $this->getChannel($channel);
$exchange = new \AMQPExchange($channel);
$exchange->setName($exName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declare();
self::$exchange[$index] = $exchange;
return $exchange;
}
/**
* 获取频道实例
* @param [string] $channelName 频道名称
* @return [object] 频道实例
*/
public function getChannel($channelName)
{
if(isset(self::$channel[$channelName]) && self::$channel[$channelName]){
return self::$channel[$channelName];
}
/* 创建新频道 */
$channel = new \AMQPChannel(self::$connection);
self::$channel[$channelName] = $channel;
return $channel;
}
/**
* 获取队列实例
* @param [string] $queueName 队列名称
* @param [string] $channel 频道名称
* @return [object] 队列实例
*/
public function getQueue($queueName, $channel)
{
$index = $queueName . '_' . $channel;
if(isset(self::$queue[$index]) && self::$queue[$index]){
return self::$queue[$index];
}
$channel = $this->getChannel($channel);
/* 创建新队列 */
$queue = new \AMQPQueue($channel);
$queue->setName($queueName);
/* 持久 */
$queue->setFlags(AMQP_DURABLE);
$queue->declare();
self::$queue[$index] = $queue;
return $queue;
}
/**
* 发布消息
* @param [string] $message 消息
* @param [string] $exchange 交换机名称
* @param [string] $channel 频道名称
* @param [string] $routeKey 路由键
* @return [boolean] $result 发布是否成功
*/
public function publish($message, $exchange, $channel, $routeKey)
{
$exchange = $this->getExchange($exchange, $channel);
$result = $exchange->publish($message, $routeKey);
return $result;
}
/**
* 接受消息
* @param [string] $queue 队列名称
* @param [string] $exName 交换机名称
* @param [string] $channel 频道名称
* @param [string] $routeKey 路由键
* @param [function] $callback ($envelope, $queue)
*/
public function consumer($queue, $exName, $channel, $routeKey, $callback)
{
$queue = $this->getQueue($queue, $channel);
$exchange = $this->getExchange($exName, $channel);
$queue->bind($exName, $routeKey);
while (true) {
$queue->consume($callback);
}
}
}
consumer.php
require 'queue.php';
$instance = Queue::instance();
$instance->consumer('test', 'test', 'test', 'test', function($envelope, $queue){
$message = $envelope->getBody();
if($message){
echo 'Message:' . $message;
/* 应答 */
$queue->ack($envelope->getDeliveryTag());
}
});
publisher.php
require 'queue.php';
$instance = Queue::instance();
for($i = 1; $i <= 10; $i++){
echo '发布测试数据:' . $i . "\n";
$instance->publish($i ."\n" , 'test' ,'test','test');
}