1.什么是RMQ?
RMQ是一种消息中间件,以队列为基础利用生产者&消费者模型,来处理不同业务模块或系统中消息的接收和发布,可以很好的做到系统的解耦,生产者只管生成,消费者只管消费
同时,作为消息队列,它可以保持事务的一致性,并且有较好的吞吐量
RMQ安装1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
371.linux
需要下载安装包以及Erlang库
首先配置源
echo "deb https://dl.bintray.com/rabbitmq/debian trusty main" | sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list
echo "deb http://packages.erlang-solutions.com/ubuntu trusty contrib" | sudo tee -a /etc/apt/sources.list.d/erlang_solutions.list
导入对应的 key
wget -c -O- http://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc | sudo apt-key add -
wget -O- https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc |sudo apt-key add -
开始安装 erlang 和 RabbitMQ
sudo apt-get update
sudo apt-get install erlang-nox
sudo apt-get install rabbitmq-server
安装完之后 RabbitMQ 便已经自动启动了,可以使用如下的命令对 RabbitMQ 进行操作:
sudo service rabbitmq-server start # 启动
sudo service rabbitmq-server stop # 停止
sudo service rabbitmq-server restart # 重启
sudo service rabbitmq-server status # 查看当前状态
配置 RabbitMQ
添加admin用户,密码设置为admin。
sudo rabbitmqctl add_user admin admin
赋予权限
sudo rabbitmqctl set_user_tags admin administrator
赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源
sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
管理 RabbitMQ
RabbitMQ 提供了一个 web 管理工具(rabbitmq_management),方便在浏览器端管理 RabbitMQ
sudo rabbitmq-plugins enable rabbitmq_management
之后在浏览器访问 [http://server-ip:15672/],账号与密码都是刚才设置的 admin
2.windows安装
要注意rmq和erlang的版本兼容性问题
https://cloud.tencent.com/developer/article/1582235
AMQP
RMQ的七种消息模式
1. 直连模式,不需要交换机1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83//消息生产者
public class Provider {
public void sendMessage() throws IOException, TimeoutException {
//创建连接mq的工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的主机
connectionFactory.setHost("127.0.0.1");
//设置端口号
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("zx");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接通道
Channel channel = connection.createChannel();
//通道参数绑定
//参数1:队列名程,如果队列不存在则自动创建
//参数2:用来定义队列是否需要持久化,true,false
//参数3:exclusive是否独占队列,独占则其他链接不可以访问
//参数4:autoDelete是否在消费完成后自动删除队列
//参数5:额外参数
channel.queueDeclare("hello",false,false,false,null);
//发布消息
//参数1:交换机名,参数2:队列名,参数3:传递消息的额外设置,参数4:消息具体内容
//可以通过参数三设置消息持久化,MessageProperties.PERSISENT_TEXT_PLAIN
channel.basicPublish("","hello",null,"hello zx".getBytes());
channel.close();
connection.close();
}
}
//消息消费者
public class Consumer{
public void getMessage() throws IOException, TimeoutException {
//创建工厂链接
//创建连接mq的工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的主机
connectionFactory.setHost("127.0.0.1");
//设置端口号
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("zx");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接通道
Channel channel = connection.createChannel();
//通道参数绑定
//参数1:队列名程,如果队列不存在则自动创建
//参数2:用来定义队列是否需要持久化,true,false
//参数3:exclusive是否独占队列,独占则其他链接不可以访问
//参数4:autoDelete是否在消费完成后自动删除队列
//参数5:额外参数
channel.queueDeclare("hello",false,false,false,null);
//消费消息
//参数1:消费哪个队列的消息
//参数2:开始消息的自动确认机制
//参数3:消费的回调接口
channel.basicConsume("hello",true, new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费:"+ new String(body));
}
});
// channel.close();
// connection.close();
}
}
链接工具类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public class RabbitMQConnection {
private static ConnectionFactory connectionFactory;
static {
//设置连接mq的主机
connectionFactory.setHost("127.0.0.1");
//设置端口号
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("zx");
connectionFactory.setPassword("123456");
}
public static Connection geConnection() {
try {
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void closeConnectionAndChanel(Channel channel, Connection connection) {
try {
if (channel != null) channel.close();
if (connection != null) connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
点对点的通信,用户登录加积分,或者发短信通知
2. 任务模式
a.生产者1
2
3
4
5
6
7
8
9
10
11public class Provider {
public void sendMessage() throws IOException {
Connection connection = RabbitMQConnection.geConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
for (int i = 0; i < 100; i++) {
channel.basicPublish("", "work", null, ("hello zx" + i).getBytes());
}
RabbitMQConnection.closeConnectionAndChanel(channel, connection);
}
}
b.消费者1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class Consumer1 {
public void getMessage() throws IOException {
Connection connection = RabbitMQConnection.geConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true, new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费1:"+ new String(body));
}
});
}
}
public class Consumer2 {
public void getMessage() throws IOException {
Connection connection = RabbitMQConnection.geConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true, new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费2:"+ new String(body));
}
});
}
}
可以发现两个消费者是平均在消费产物
避免能力强的和能力弱的处理一样,效率太低,解决措施:能者多劳实现1
2
3
4
5
6
7
8
9在消费者中,
设置channel一次只能处理一个消息,防止消息堆积
channel.basicQos(1);
同时,要设置basicConsume的自动确认参数为false,等待没处理完成一个后,自动确认
channel.basicConsume("work",false,new DefaultConsumer{})
接下来,设置手动确认机制(参数1:手动确认标识,参数2:每次确认一个)
在handleDelivery中编写
channel.basicAck(envelope.getDeliveryTag(), false);
进一步保证消息可靠,不丢失,原先没有配置前,消费者是一股脑的从队列中获取多个消息,直接返回队列处理完成,然后挨个处理,若突然宕机,消息就丢失了,第二种处理是手动的一个一个获取,一个一个使用,然后在返回确认消息
- 广播模式fanout
a.生产者+交换机1
2
3
4
5
6
7
8
9
10
11
12
13
14public class Provider {
public void sendMessage() throws IOException {
//获取链接对象
Connection connection = RabbitMQConnection.geConnection();
Channel channel = connection.createChannel();
//给通道声明交换机:参数1:创建的交换机名程(有的话不会创建) 参数2:交换机类型 fanout 广播类型
channel.exchangeDeclare("logs", "fanout");
//生产消息
channel.basicPublish("logs", "", null, "fanout message".getBytes());
RabbitMQConnection.closeConnectionAndChanel(channel, connection);
}
}
b.消费者+队列+绑定交换交1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51public class Consumer1 {
public void getMessage() throws IOException {
//获取链接
Connection connection = RabbitMQConnection.geConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
//临时队列,也可以永久
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列,s2路由key,先为空
channel.queueBind(queue,"logs","");
//消费
channel.basicConsume(queue,true,new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+ new String(body));
}
});
}
}
public class Consumer2 {
public void getMessage() throws IOException {
//获取链接
Connection connection = RabbitMQConnection.geConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
//临时队列,也可以永久
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列,s2路由key,先为空
channel.queueBind(queue,"logs","");
//消费
channel.basicConsume(queue,true,new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+ new String(body));
}
});
}
}
不同模块可以同时接到相同的消息,去处理不同的功能
- Routing订阅模式
a.生产者+direct交换机+路由key1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class Provider {
public void sendMessage() throws IOException {
Connection connection = RabbitMQConnection.geConnection();
Channel channel = connection.createChannel();
//通过通道声明交换机,参数1:交换机名 参数2:direct 路由模式
channel.exchangeDeclare("logs_direct", "direct");
//路由key
String routeKey = "info";
//生产消息
channel.basicPublish("logs_direct", routeKey, null, ("基于direct模型发布的route key=" + routeKey).getBytes());
RabbitMQConnection.closeConnectionAndChanel(channel, connection);
}
}
b.消费者+绑定交换机+队列+路由key1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50public class Consumer1 {
public void getMessage() throws IOException {
Connection connection = RabbitMQConnection.geConnection();
Channel channel = connection.createChannel();
//声明交换机及交换类型
channel.exchangeDeclare("logs_direct","direct");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
String routeKey = "error";
//基于route key绑定交换机和队列
channel.queueBind(queue,"logs_direct",routeKey);
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
}
public class Consumer2 {
public void getMessage() throws IOException {
Connection connection = RabbitMQConnection.geConnection();
Channel channel = connection.createChannel();
//声明交换机及交换类型
channel.exchangeDeclare("logs_direct","direct");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
String routeKey1 = "error";
String routeKey2 = "info";
String routeKey3 = "warning";
//基于route key绑定交换机和队列
channel.queueBind(queue,"logs_direct",routeKey1);
channel.queueBind(queue,"logs_direct",routeKey2);
channel.queueBind(queue,"logs_direct",routeKey3);
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
}
}
生产者生产时会绑定不同的路由key,交换机根据key,去放入不同的队列,而消费者则根据绑定的key可以获取到特定的消息。不同业务模块可以处理对应业务,通过路由模式,而不是像第三种广播模式,生产者要向每个消费者发送
- Topic订阅模型
a.生产者+交换机+路由key1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class Provider {
public void sendMessage() throws IOException {
Connection connection = RabbitMQConnection.geConnection();
Channel channel = connection.createChannel();
//通过通道声明交换机,参数1:交换机名 参数2:direct 路由模式
channel.exchangeDeclare("Topics", "topic");
//路由key
String routeKey = "dao.user";
//生产消息
channel.basicPublish("Topics", routeKey, null, ("基于direct模型发布的route key=" + routeKey).getBytes());
RabbitMQConnection.closeConnectionAndChanel(channel, connection);
}
}
b.消费者+绑定交换机和路由key通配符1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45public class Consumer1 {
public void getMessage() throws IOException {
Connection connection = RabbitMQConnection.geConnection();
Channel channel = connection.createChannel();
//声明交换机及交换类型
channel.exchangeDeclare("Topics","topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
String routeKey = "dao.*";
//基于route key绑定交换机和队列
channel.queueBind(queue,"Topics",routeKey);
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
}
public class Consumer2 {
public void getMessage() throws IOException {
Connection connection = RabbitMQConnection.geConnection();
Channel channel = connection.createChannel();
//声明交换机及交换类型
channel.exchangeDeclare("Topics","topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
String routeKey = "dao.#";
//基于route key绑定交换机和队列
channel.queueBind(queue,"Topics",routeKey);
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
}
}
- RPC
我们的RPC将像这样工作:
a.对于RPC请求,客户端发送一条消息,该消息具有两个属性:replyTo(设置为仅为该请求创建的匿名互斥队列)和correlationId(设置为每个请求的唯一值)。
b.该请求被发送到rpc_queue队列。
c.RPC工作程序(又名:服务器)正在等待该队列上的请求。出现请求时,它会使用replyTo字段中的队列来完成工作并将带有结果的消息发送回客户端。
d.客户端等待答复队列中的数据。出现消息时,它会检查correlationId属性。如果它与请求中的值匹配,则将响应返回给应用程序。
- PC
SpringBoot整合RMQ
一对一直连
1.需要导入rmp-stater
2.编写配置1
2
3
4
5
6
7
8
9spring:
application:
name: rmq-springboot
rabbitmq:
host: localhost
port: 5672
username: zx
password: 123456
virtual-host: /ems
3.编写privoder1
2
3
4
5
6
7
8public class Provider {
private RabbitTemplate template;
public void sendMessage() {
template.convertAndSend("hello","hello,zx");
}
}
4.编写Consumer1
2
3
4
5
6
7
8
9
10
11
//可以指定队列是否持久化,是否独占,是否自动删除
"hello",durable = "true")) (queuesToDeclare = (value =
public class Consumer {
public void getMessage(String message){
System.out.println("message="+message);
}
}
工作模型(Work)
轮询接收,也可以能者多劳
1.provider1
2
3
4
5
6
7
8
9public class Provider {
RabbitTemplate rabbitTemplate;
public void sendMessage() {
rabbitTemplate.convertAndSend("work","hello,work");
}
}
2.consumer1
2
3
4
5
6
7
8
9
10
11
12
13
public class Consumer {
"work")) (queuesToDeclare = (
public void getMessage1(String message) {
System.out.println("message1:"+message);
}
"work")) (queuesToDeclare = (
public void getMessage2(String message) {
System.out.println("message2:"+message);
}
}
发布订阅–广播模式fanout
所有消费者都可以接收到
1.privoder1
2
3
4
5
6
7
8
9public class Provider {
RabbitTemplate rabbitTemplate;
public void sendMessage() {
//参数不同,对应不同模式
rabbitTemplate.convertAndSend("logs","","hello, fanout");
}
}
2consumer1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Consumer {
(bindings = {
(
value = //如果什么都不指定,则是一个临时队列 ,
exchange = "logs",type = "fanout") //绑定交换机 (value =
)
})
public void getMessage1(String message) {
System.out.println("message1:"+message);
}
(bindings = {
(
value = //如果什么都不指定,则是一个临时队列 ,
exchange = "logs",type = "fanout") //绑定交换机 (value =
)
})
public void getMessage2(String message) {
System.out.println("message2:"+message);
}
}
路由模式route
根据路由key获取指定的消息队列中的消息
1.privoder1
2
3
4
5
6
7
8
9public class Provider {
RabbitTemplate rabbitTemplate;
public void sendMessage() {
//参数不同,对应不同模式
rabbitTemplate.convertAndSend("directs","info","hello, driect");
}
}
2.consumer1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Consumer {
(bindings = {
(
value = //如果什么都不指定,则是一个临时队列 ,
exchange = "directs",type = "direct"), //绑定交换机 (value =
key = {"info","warning","log"}
)
})
public void getMessage1(String message) {
System.out.println("message1:"+message);
}
(bindings = {
(
value = //如果什么都不指定,则是一个临时队列 ,
exchange = "directs",type = "direct"), //绑定交换机 (value =
key = {"warning","log"}
)
})
public void getMessage2(String message) {
System.out.println("message2:"+message);
}
}
Topics模式,动态路由,订阅模式
1.privoder1
2
3
4
5
6
7
8
9public class Provider {
RabbitTemplate rabbitTemplate;
public void sendMessage() {
//参数不同,对应不同模式
rabbitTemplate.convertAndSend("topics","user.add","hello, topics");
}
}
2.consumer1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Consumer {
(bindings = {
(
value = //如果什么都不指定,则是一个临时队列 ,
exchange = "topics",type = "topic"), //绑定交换机 (value =
key = {"user.*","product.#"}
)
})
public void getMessage1(String message) {
System.out.println("message1:"+message);
}
(bindings = {
(
value = //如果什么都不指定,则是一个临时队列 ,
exchange = "topics",type = "topic"), //绑定交换机 (value =
key = {"user"}
)
})
public void getMessage2(String message) {
System.out.println("message2:"+message);
}
}
RMQ应用场景
异步处理
应用解耦
流量削峰
RMQ集群
1.普通集群(副本集群)
主从复制,只同步交换机数据,从节点并不会复制主节点上的原队列数据,只是帮助减轻主节点的压力,从节点只能看到从节点上的信息,并不会保存队列,消费者可以从从节点拿消息,但是一但master节点挂了,系统就不可以正常工作了,会有消息丢失。
主节点要是持久化了消息,从节点可以备份,但是主节点挂了,从节点是不可以使用的
集群搭建步骤:
需要三台机器,为了方便操作去修改主机名,配置ip,三台机器都要配置映射
2.镜像集群
会复制主节点所有信息,主节点挂了,从节点会补上
集群搭建步骤
在普通集群的基础上,在任意一台机器上添加策略之后,就会变成镜像集群