|
一、安装:
1.安装rabbitmq运行环境
1)下载地址:https://www.erlang.org/downloads
2)下载完成之后,安装,直接下一步、下一步。
3)安装完成或,配置环境变量。
变量名:ERLANG_HOME
变量值:根据你安装的路径,如果都是下一步的话路径就是:C:\Program Files\erl-23.0

配置path路径:

2.安装rabbitmq。
1.下载地址:https://www.rabbitmq.com/download.html。
2)下载完成之后,安装,直接下一步、下一步。
3)进入安装路径,如果是直接下一步的话,路径是:C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.6\sbin,执行rabbitmq-plugins enable rabbitmq_management开启管理界面

4)启动。

点击右键,用管理员的身份运行。
5)然后再浏览器中输入:http://localhost:15672/#/exchanges,地址,就可以看到如下界面了

6)输入rabbitmq的初始密码:Username:guest,Password:guest,就可以看到如下界面了:

7)rabbitmq登录进来,新建一个用户。

比如我新建的 Username:tianbin_test ,password:tianbin_test
8)选择标签(选Admin)

9)用户添加成功。

10)没有虚拟主机,新建一个虚拟主机

11)我这里填写的

12)添加成功的界面如下:

13)设置权限



14)查看权限

15)over。
二.rabbitmq的五种模式。
1.简单模式

说明:
1.生产者将消息交给默认的交换器(AMQP default) 2 .交换器获取消息后交给绑定这个生产者的队列(关系是通过队列名称完成) 3.监听当前队列的消费者获取消息,执行消费逻辑
使用:
1)使用tianbin_test用户登录。

2)设置权限

3).进入某个文件夹
在控制台中执行:composer init
然后再执行 :composer require php-amqplib/php-amqplib
代码结果如下:

新建simple文件夹和文件,最终结果如下:

simple文件夹中customer文件内容如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
$exchange = &#39;simple_query&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$channel->queue_declare($exchange,false,false, false, false);
$callback = function ($msg) {
echo &#39; [x] Received &#39;, $msg->body, &#34;\n&#34;;
};
$channel->basic_consume($exchange, &#39;&#39;, false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
simple文件夹中producter文件内容如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = &#39;simple_query&#39;;
$connection = new AMQPStreamConnection(&#39;127.0.0.1&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$AMQPChannel = $connection->channel();
$AMQPChannel->queue_declare($exchange,false,false, false, false);
$message = &#39;简单模式&#39;;
$AMQPMessage = new AMQPMessage($message,array(&#39;content_type&#39; => &#39;text/plain&#39;));
$AMQPChannel->basic_publish($AMQPMessage,&#39;&#39;,$exchange);
$AMQPChannel->close();
$connection->close();
执行:php producter.php,结果如下:


执行:php customer.php,结果如下:

备注:如果要手动进行消息确认(就是确定该消息有没有被消费成功)
请使用如下代码:
customer代码如下
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
$exchange = &#39;simple_query&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$channel->queue_declare($exchange,false,true, false, false);
$callback = function ($msg) {
echo &#39; [x] Received &#39;, $msg->body, &#34;\n&#34;;
//这里抛异常,该消息不会被消费
$msg->delivery_info[&#39;channel&#39;]->basic_ack($msg->delivery_info[&#39;delivery_tag&#39;]);
};
$channel->basic_consume($exchange, &#39;&#39;, false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
producter代码如下
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = &#39;simple_query&#39;;
$connection = new AMQPStreamConnection(&#39;127.0.0.1&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$AMQPChannel = $connection->channel();
//$AMQPChannel->queue_declare($exchange,false,false, false, false);
$AMQPChannel->queue_declare($exchange,false,true, false, false);
$message = &#39;简单模式&#39;;
$AMQPMessage = new AMQPMessage($message, array(&#39;delivery_mode&#39; => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$AMQPChannel->basic_publish($AMQPMessage,&#39;&#39;,$exchange);
$AMQPChannel->close();
$connection->close();
演示:
1)执行:php producter.php,结果如下:


2)修改customer,结果如下:

3)执行
php customer.php
4)查看

5)结果如下:

7)如果我将异常去掉,执行

发现该消息已经被消费了。
2.工作模式

说明:
- 生产者将消息交个交换机
- 交换机交给绑定的队列
- 队列由多个消费者同时监听,只有其中一个能够获取这一条消息,形成了资源的争抢,谁的资源空闲大,争抢到的可能越大;
使用:
新建work目录和文件,最终目录结果如下

2)代码如下:
work文件夹producter的文件内容如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = &#39;task_query&#39;;
$connection = new AMQPStreamConnection(&#39;127.0.0.1&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$AMQPChannel = $connection->channel();
$AMQPChannel->queue_declare($exchange,false,true, false, false);
$message = &#39;&#39;;
for ($i =0;$i<50;$i++){
$message= &#39;工作模式&#39;.(string)$i;
$AMQPMessage = new AMQPMessage($message,array(&#39;delivery_mode&#39; => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$AMQPChannel->basic_publish($AMQPMessage,&#39;&#39;,$exchange);
}
$AMQPChannel->close();
$connection->close();
work文件夹customerOne的文件内容如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
require_once __DIR__ . &#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
$exchange = &#39;task_query&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$channel->queue_declare($exchange, false, true, false, false);
$callback = function ($msg) {
echo &#39; [x] Received &#39;, $msg->body, &#34;\n&#34;;
echo &#34; [x] Done\n&#34;;
sleep(10);
$msg->delivery_info[&#39;channel&#39;]->basic_ack($msg->delivery_info[&#39;delivery_tag&#39;]);
};
$channel->basic_consume($exchange, &#39;&#39;, false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
work文件夹customerTwo的文件内容如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
$exchange = &#39;task_query&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$channel->queue_declare($exchange,false,true, false, false);
$callback = function ($msg) {
echo &#39; [x] Received &#39;, $msg->body, &#34;\n&#34;;
echo &#34; [x] Done\n&#34;;
$msg->delivery_info[&#39;channel&#39;]->basic_ack($msg->delivery_info[&#39;delivery_tag&#39;]);
};
$channel->basic_consume($exchange, &#39;&#39;, false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
3)执行
php customerOne.php
php customerTwo.php
php producter.php
4)结果如下:


发现没有多劳多得。
6)修改代码
customerTwo的文件内容如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
$exchange = &#39;task_query&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$channel->queue_declare($exchange,false,true, false, false);
$callback = function ($msg) {
echo &#39; [x] Received &#39;, $msg->body, &#34;\n&#34;;
echo &#34; [x] Done\n&#34;;
$msg->delivery_info[&#39;channel&#39;]->basic_ack($msg->delivery_info[&#39;delivery_tag&#39;]);
};
$channel->basic_qos(null,1,null);//每个消费者只能处理一条信息
$channel->basic_consume($exchange, &#39;&#39;, false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
customerOne文件内容如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
require_once __DIR__ . &#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
$exchange = &#39;task_query&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$channel->queue_declare($exchange, false, true, false, false);
$callback = function ($msg) {
echo &#39; [x] Received &#39;, $msg->body, &#34;\n&#34;;
echo &#34; [x] Done\n&#34;;
sleep(10);
$msg->delivery_info[&#39;channel&#39;]->basic_ack($msg->delivery_info[&#39;delivery_tag&#39;]);
};
$channel->basic_qos(null,1,null);//每个消费者只能处理一条信息
$channel->basic_consume($exchange, &#39;&#39;, false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
7)执行
php customerOne.php
php customerTwo.php
php producter.php
8)结果如下:


9)over。
3.发布订阅模型

说明:
1 .生产者扔给交换机消息 2 .交换机根据自身的类型(fanout)将会把所有消息复制同步到所有与其绑定的队列 3 .每个队列可以有一个消费者,接收消息进行消费逻辑
使用:
1)新建fanout文件夹和文件,最终结构如下:

2)代码如下:
customerOne文件内容如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$exchange = &#39;fanout.exchange.test&#39;;
$queue_name = &#39;fanout.query1&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, false);
list($queue_name, ,) = $channel->queue_declare($queue_name, false, false, true, false);
$channel->queue_bind($queue_name, $exchange);
$callback = function ($msg) {
echo &#39; [x] &#39;, $msg->body, &#34;\n&#34;;
};
$channel->basic_consume($queue_name, &#39;&#39;, false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
customerTwo的文件内容如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$exchange = &#39;fanout.exchange.test&#39;;
$queue_name = &#39;fanout.query1&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, false);
list($queue_name, ,) = $channel->queue_declare($queue_name, false, false, true, false);
$channel->queue_bind($queue_name, $exchange);
$callback = function ($msg) {
echo &#39; [x] &#39;, $msg->body, &#34;\n&#34;;
};
$channel->basic_consume($queue_name, &#39;&#39;, false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
producter文件内容如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$exchange = &#39;fanout.exchange.test&#39;;
$queue_name = &#39;fanout.query1&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, false);
list($queue_name, ,) = $channel->queue_declare($queue_name, false, false, true, false);
$channel->queue_bind($queue_name, $exchange);
$callback = function ($msg) {
echo &#39; [x] &#39;, $msg->body, &#34;\n&#34;;
};
$channel->basic_consume($queue_name, &#39;&#39;, false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
3)执行
php customerOne.php
php customerTwo.php
php producter.php
4)结果如下:
producter结果:

customerOne结果如下:

customerTwo结果如下:

4.路由模式

说明:
1.生产者还是将消息发送给交换机,消息携带具体的路由key(routingKey) 2.交换机类型direct,将接收到的消息中的routingKey,比对与之绑定的队列的routingKey 3.消费者监听一个队列,获取消息,执行消费逻辑
使用:
1)新建direct文文件夹和文件,最终结果如下:

2)代码如下:
customerOne代码如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$exchange = &#39;fanout.exchange.test&#39;;
$queue_name = &#39;fanout.query1&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, false);
list($queue_name, ,) = $channel->queue_declare($queue_name, false, false, true, false);
$channel->queue_bind($queue_name, $exchange);
$callback = function ($msg) {
echo &#39; [x] &#39;, $msg->body, &#34;\n&#34;;
};
$channel->basic_consume($queue_name, &#39;&#39;, false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
customerTwo代码如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$exchange = &#39;fanout.exchange.test&#39;;
$queue_name = &#39;fanout.query2&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, false);
list($queue_name, ,) = $channel->queue_declare($queue_name, false, false, true, false);
$channel->queue_bind($queue_name, $exchange);
$callback = function ($msg) {
echo &#39; [x] &#39;, $msg->body, &#34;\n&#34;;
};
$channel->basic_consume($queue_name, &#39;&#39;, false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
producter代码如下:
<?php
require_once __DIR__ . &#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = &#39;fanout.exchange.test&#39;;
$connection = new AMQPStreamConnection(&#39;127.0.0.1&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$AMQPChannel = $connection->channel();
$AMQPChannel->exchange_declare($exchange, AMQPExchangeType::FANOUT,false , false, false);
$message = &#39;fanout,hello&#39;;
$AMQPMessage = new AMQPMessage($message);
$AMQPChannel->basic_publish($AMQPMessage, $exchange);
echo &#39; [x] Sent &#39;, $message, &#34;\n&#34;;
$AMQPChannel->close();
$connection->close();
3)最终结果如下:
producter如下:

customerTwo如下:

customerOne如下:

4)over。
5.主题 模式

说明:
1.生产端发送消息,消息携带具体的路由key 2 .交换机的类型topic 3 .队列绑定交换机不在使用具体的路由key而是一个范围值
*表示一个字符串(不能携带特殊符号) #表示任意字符串
使用:
1)新建topic文件夹和文件,最终结果如下:

2)代码如下:
customerOne代码如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$exchange = &#39;topic.exchange.test&#39;;
$queue_name = &#39;topic.query1&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$binding_keys = [];
$binding_keys[] = &#39;#&#39;;
$binding_keys[] = &#34;*&#34;;
$channel->exchange_declare($exchange, AMQPExchangeType::TOPIC, false, false, false);
list($queue_name, ,) = $channel->queue_declare($queue_name, false, false, true, false);
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, $exchange, $binding_key);
}
$callback = function ($msg) {
echo &#39; [one] &#39;, $msg->delivery_info[&#39;routing_key&#39;], &#39;:&#39;, $msg->body, &#34;\n&#34;;
};
$channel->basic_consume($queue_name, &#39;&#39;, false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
customerTwo代码如下:
`customerOne`代码如下:<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$exchange = &#39;topic.exchange.test&#39;;
$queue_name = &#39;topic.query2&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$binding_keys = [];
$binding_keys[] = &#39;insert.*&#39;;
$channel->exchange_declare($exchange, AMQPExchangeType::TOPIC, false, false, false);
list($queue_name, ,) = $channel->queue_declare($queue_name, false, false, true, false);
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, $exchange, $binding_key);
}
$callback = function ($msg) {
echo &#39; [one] &#39;, $msg->delivery_info[&#39;routing_key&#39;], &#39;:&#39;, $msg->body, &#34;\n&#34;;
};
$channel->basic_consume($queue_name, &#39;&#39;, false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
producter代码如下:
<?php
require_once __DIR__.&#39;/../vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$exchange = &#39;topic.exchange.test&#39;;
$queue_name = &#39;topic.query2&#39;;
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;tianbin_test&#39;, &#39;tianbin_test&#39;);
$channel = $connection->channel();
$binding_keys = [];
$binding_keys[] = &#39;insert.*&#39;;
$channel->exchange_declare($exchange, AMQPExchangeType::TOPIC, false, false, false);
list($queue_name, ,) = $channel->queue_declare($queue_name, false, false, true, false);
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, $exchange, $binding_key);
}
$callback = function ($msg) {
echo &#39; [one] &#39;, $msg->delivery_info[&#39;routing_key&#39;], &#39;:&#39;, $msg->body, &#34;\n&#34;;
};
$channel->basic_consume($queue_name, &#39;&#39;, false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
3)最终结果如下:
producter结果如下:

customerOne结果如下:

customerTwo结果如下:

如果有错误的话,欢迎指出。 |
|