此实例结合Springboot来实现消息的发布与订阅,包含说明以及扩展。
1.配置文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53server:
port: 8001
spring:
application:
name: rmq-productor
spring:
rabbitmq:
username: guest
password: guest
port: 5672
# spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3 #集群或单机 都可配置
addresses: 127.0.0.1
virtual-host: /
# 发送方
# 开启发送失败返回
publisher-returns: true
# 开启发送确认
publisher-confirms: true
# 消费方
listener:
# 开启手动ACK(坑:当序列化为JSON时,此配置会失效)
simple:
# 指定最小的消费者数量
concurrency: 3
# 指定最大的消费者数量
max-concurrency: 3
# 开启ack确认,auto根据情况确认,manual手动,none 发送消息后直接确认消息
acknowledge-mode: manual
#消费者每次从队列获取的消息数量 (默认一次250个)
#通过查看后台管理器中queue的unacked数量
prefetch: 5
# 消费者自启动
auto-startup: true
# 消费失败,重新入队
default-requeue-rejected: true
retry:
enabled: true
# 开启ack
direct:
acknowledge-mode: manual
# 支持消息的确认和返回
template:
mandatory: true
# 启用发送重试
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
max-interval: 10000
multiplier: 1.0
# 连接超时时间
connection-timeout: 10s
2.配置类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112import com.zhaixin.constants.RabbitMqConstant;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
public class RabbitMqConfig {
"${spring.rabbitmq.host}") (
private String springRabbitmqHost;
"${spring.rabbitmq.port}") (
private int springRabbitmqPort;
"${spring.rabbitmq.username}") (
private String springRabbitmqUsername;
"${spring.rabbitmq.password}") (
private String springRabbitmqPassword;
"${spring.rabbitmq.publisher-confirms}") (
private boolean springRabbitmqPublisher_confirms;
"${spring.rabbitmq.virtual-host}") (
private String springRabbitmqVirtual_host;
//rmq连接池
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(springRabbitmqHost);
cachingConnectionFactory.setPort(springRabbitmqPort);
cachingConnectionFactory.setUsername(springRabbitmqUsername);
cachingConnectionFactory.setPassword(springRabbitmqPassword);
cachingConnectionFactory.setPublisherConfirms(springRabbitmqPublisher_confirms);
cachingConnectionFactory.setVirtualHost(springRabbitmqVirtual_host);
return cachingConnectionFactory;
}
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
rabbitTemplate.setReturnCallback(msgSendReturnCallBack());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//rabbitTemplate.setMandatory(true); 配置文件中设置了,这里就不再设置
return rabbitTemplate;
}
//生产者消息确认机制
public MsgSendConfirmCallBack msgSendConfirmCallBack() {
return new MsgSendConfirmCallBack();
}
public MsgSendReturnCallBack msgSendReturnCallBack() {
return new MsgSendReturnCallBack();
}
// 消息队列及交换机的绑定
Queue rabbitmqQueue() {
return new Queue(RabbitMqConstant.QUEUE_NAME, true);
}
DirectExchange rabbitmqExchange() {
return new DirectExchange(RabbitMqConstant.EXCHANGE_NAME, true, false);
}
Binding rabbitmqBinding() {
return BindingBuilder.bind(rabbitmqQueue()).to(rabbitmqExchange()).with(RabbitMqConstant.ROUTING_KEY);
}
//消费者配置,由于配置文件中配置了,这里可以不写
// @Bean
// public SimpleMessageListenerContainer messageListenerContainer(){
// SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
// container.setConnectionFactory(connectionFactory());
// container.setConcurrentConsumers(3);
// container.setMaxConcurrentConsumers(10);
// return container;
// }
//解决Springboot集成rabbitmq序列化出问题
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
MsgSendConfirmCallBack1
2
3
4
5
6
7
8
9
10
11
12
13
14
15import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("消息id:"+ correlationData);
if(ack) {
System.out.println("消息发送到交换机成功");
}else {
System.out.println("消息发送到交换机失败");
}
}
}
MsgSendReturnCallBack1
2
3
4
5
6
public class MsgSendReturnCallBack implements RabbitTemplate.ReturnCallback {
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("消息从交换机到队列失败");
}
}
如果在rabbitmq的配置类rabbitmqtempalte中,不设置messageConverter的话,可以设置全局的converter1
2
3
4
5
6
7
8
9
10
11
12import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
// 全局配置converter
//@Configuration
public class MessagesConverter {
MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
同样的,对于生产者的消息确认,也可以直接写在RabbitTemplate中,使用lamda表达式的方式,例子1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
String msgId = data.getId();
if (ack) {
logger.info(msgId + ":消息发送成功");
mailSendLogService.updateMailSendLogStatus(msgId, 1);//修改数据库中的记录,消息投递成功
} else {
logger.info(msgId + ":消息发送失败");
}
});
rabbitTemplate.setReturnCallback((msg, repCode, repText, exchange, routingkey) -> {
logger.info("消息发送失败");
});
return rabbitTemplate;
}
之后就可以利用convertandsend发送消息到交换机,然后再到队列1
2
3
4
5
6
7
8
9
10
11
12
13
public class RabbitmqController {
RabbitTemplate template;
"/test") (
public String message() {
String msgId = UUID.randomUUID().toString();
template.convertAndSend(RabbitMqConstant.EXCHANGE_NAME,RabbitMqConstant.ROUTING_KEY,"value",new CorrelationData(msgId));
return "success";
}
}
消费方可以通过如下代码监听获取队列中的消息,并确认1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
"test",durable = "true")) (queuesToDeclare = (value =
public class RabbitMqMessage {
public void process(Object ojb, Channel channel, Message message) throws IOException {
System.out.println("TestReceiver1:我被消费了:"+ojb);
try{
System.out.println("TestReceiver1 message:"+message.getMessageProperties()+":"+new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("TestReceiver1-消息已确认!!!");
}catch(Exception e){
e.printStackTrace();//TODO 业务处理
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
System.out.println("TestReceiver1-重新放入队列!!!");
}
}
}
如上,如果消费失败,则会重新放回队列,但是如果一直失败,则会产生死循环,我们一般不会使他一直重新放入队列,当重放了一定次数,就认为失败了,就写入数据库中,等待手动操作。
比如下面的定时任务,当重试次数高于3时,直接设置失败:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16@Scheduled(cron = "0/10 * * * * ?")
public void mailResendTask() {
List<MailSendLog> logs = mailSendLogService.getMailSendLogsByStatus();
if (logs == null || logs.size() == 0) {
return;
}
logs.forEach(mailSendLog->{
if (mailSendLog.getCount() >= 3) {
mailSendLogService.updateMailSendLogStatus(mailSendLog.getMsgId(), 2);//直接设置该条消息发送失败
}else{
mailSendLogService.updateCount(mailSendLog.getMsgId(), new Date());
Employee emp = employeeService.getEmployeeById(mailSendLog.getEmpId());
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME, MailConstants.MAIL_ROUTING_KEY_NAME, emp, new CorrelationData(mailSendLog.getMsgId()));
}
});
}
参考:
https://my.oschina.net/648885471/blog/2046143
https://juejin.im/post/6844903843612852232
https://www.cnblogs.com/sw008/p/11054293.html
https://www.jianshu.com/p/2c5eebfd0e95
https://juejin.im/post/6844903892887535624
https://www.cnblogs.com/sw008/p/11054292.html
https://www.cnblogs.com/piaolingzxh/p/5448927.html
https://my.oschina.net/xiaolyuh/blog/1619586
https://blog.csdn.net/hry2015/article/details/79545157