06-发布确认高级篇
在生产环境中由于一些不明原因,导致RabbitMQ重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢:
保证四个阶段的可靠性,才能保证整个系统的可靠性。
- 生产者发出后保证到达了MQ。
- MQ收到消息保证分发到了消息对应的Exchange。
- Exchange分发消息入队之后保证消息的持久性。
- 消费者收到消息之后保证消息的正确消费。
# 一、生产者发送消息到MQ失败
在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。
为了解决这个问题,RabbitMQ引入了事务机制和发送方确认机制(publisher confirm),由于事务机制过于消耗性能所以一般不用,这里我们重点讨论发送方确认机制。
# 1.1 打开此功能需要一下配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: 123456
# 打开消息确认机制
publisher-confirm-type: correlated
2
3
4
5
6
7
8
publisher-confirm-type 可以设置为 NONE
(默认值) CORRELATED
SIMPLE
- NONE
- 禁用发布确认模式,是默认值
- CORRELATED 推荐使用👍🏻
- 发布消息成功到交换器后会触发回调方法
- SIMPLE
- 发布消息成功到交换器后会触发回调方法
- 发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
# 1.2 声明队列、和交换机以及RoutingKey
@Configuration
public class ConfirmConfig {
//交换机名
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列名
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//routingKey
public static final String CONFIRM_ROUTING_KEY = "key1";
@Bean
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 1.3 消费者
@Component
@Slf4j
public class ConfirmConsumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message) {
java.lang.String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("接收到的队列confirm.queue消息:{}", msg);
}
}
2
3
4
5
6
7
8
9
10
# 1.4 生产者
生产者代码里我们看到又多了一个参数:CorrelationData
,这个参数是用来做消息的唯一标识。在下面👇🏻的ConfirmCallback中要使用到。
@Slf4j
@RequestMapping("/confirm")
@RestController
public class ProducerController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData);
log.info("发送消息内容为:{}", message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 1.5 ConfirmCallback
在RabbitTemplate
中有个内部接口ConfirmCallback
可以自己实现一些这个接口,交换机收到或者收不到都会回调这个接口
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
@Resource
private RabbitTemplate rabbitTemplate;
//注入
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交换机确认回调接口
*
* @param correlationData 保存回调消息ID及相关信息
* @param ack 交换机瘦小消息
* @param cause 可选原因,如果可用,则为 nack,否则为 null。
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : null;
if (ack) {
log.info("交换机已经收到ID为:{}的消息", id);
} else {
log.info("交换机还未收到ID为:{}的消息,原因是:{}", id, cause);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 1.6 测试
- 正常情况下,生产者发送消息,交换机接收到消息
GET http://localhost:8080/confirm/sendMsg/消息1
日志如下:
2021-10-25 00:22:01.047 INFO 97090 --- [nio-8080-exec-1] t.b.r.controller.ProducerController : 发送消息内容为:消息1
2021-10-25 00:22:01.055 INFO 97090 --- [nectionFactory1] top.banner.rabbitmq.config.MyCallBack : 交换机已经收到ID为:1的消息
2021-10-25 00:22:01.057 INFO 97090 --- [ntContainer#0-1] t.b.rabbitmq.consumer.ConfirmConsumer : 接收到的队列confirm.queue消息:消息1
2
3
- 如果消息发送失败或者找不到对应的交换机时(修改发送的时候的交换机名称为
ConfirmConfig.CONFIRM_EXCHANGE_NAME + "1"
)
GET http://localhost:8080/confirm/sendMsg/消息1
日志如下:
2021-10-25 00:20:38.700 INFO 97064 --- [nio-8080-exec-1] t.b.r.controller.ProducerController : 发送消息内容为:消息1
2021-10-25 00:20:38.706 ERROR 97064 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange1' in vhost '/', class-id=60, method-id=40)
2021-10-25 00:20:38.708 INFO 97064 --- [nectionFactory2] top.banner.rabbitmq.config.MyCallBack : 交换机还未收到ID为:1的消息,原因是:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange1' in vhost '/', class-id=60, method-id=40)
2
3
4
🤔Tip:消息确认失败不只有消息没发过去会触发,消息发过去但是找不到对应的Exchange,也会触发。
# 二、路由失败
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
# 2.1 开启配置
spring:
application:
name: springboot-rabbitmq
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: 123456
publisher-confirm-type: correlated
# 打开消息退回机制
publisher-returns: true
2
3
4
5
6
7
8
9
10
11
12
🤔 **publisher-return ** 模式可以在消息没有被路由到指定的Queue时将消息返回,而不是丢弃。
🤔 spring.rabbitmq.template.mandatory=true 指定消息在没有被队列接收时是否强行退回还是直接丢弃。
通过 org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
以及org.springframework.boot.autoconfigure.amqp.RabbitTemplateConfigurer
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitOperations.class)
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
configurer.configure(template, connectionFactory);
return template;
}
2
3
4
5
6
7
8
9
public void configure(RabbitTemplate template, ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
template.setConnectionFactory(connectionFactory);
if (this.messageConverter != null) {
template.setMessageConverter(this.messageConverter);
}
template.setMandatory(determineMandatoryFlag());
RabbitProperties.Template templateProperties = this.rabbitProperties.getTemplate();
if (templateProperties.getRetry().isEnabled()) {
template.setRetryTemplate(new RetryTemplateFactory(this.retryTemplateCustomizers)
.createRetryTemplate(templateProperties.getRetry(), RabbitRetryTemplateCustomizer.Target.SENDER));
}
map.from(templateProperties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReceiveTimeout);
map.from(templateProperties::getReplyTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReplyTimeout);
map.from(templateProperties::getExchange).to(template::setExchange);
map.from(templateProperties::getRoutingKey).to(template::setRoutingKey);
map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
}
/**
* 获取spring.rabbitmq.template.mandatory属性配置;
* 这里面会有三种可能,为null、false、true
* 而只有在mandatory为null时才会读取publisher-return属性值
**/
private boolean determineMandatoryFlag() {
Boolean mandatory = this.rabbitProperties.getTemplate().getMandatory();
return (mandatory != null) ? mandatory : this.rabbitProperties.isPublisherReturns();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
🤯 由此可以看出:
- spring.rabbitmq.template.mandatory属性的优先级高于spring.rabbitmq.publisher-returns的优先级
- spring.rabbitmq.template.mandatory结果为true、false时会忽略掉spring.rabbitmq.publisher-returns属性的值
- spring.rabbitmq.template.mandatory结果为null(即不配置)时结果由spring.rabbitmq.publisher-returns确定
# 2.2 配置回退回调
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Resource
private RabbitTemplate rabbitTemplate;
//注入
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 交换机确认回调接口
*
* @param correlationData 保存回调消息ID及相关信息
* @param ack 交换机瘦小消息
* @param cause 可选原因,如果可用,则为 nack,否则为 null。
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : null;
if (ack) {
log.info("交换机已经收到ID为:{}的消息", id);
} else {
log.info("交换机还未收到ID为:{}的消息,原因是:{}", id, cause);
}
}
/**
* 可以在消息传递过程中不可达目的地时将消息返回给生产者
* 消息不可路由时
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("消息:{},被交换机:{}退回,退回原因:{},路由Key:{}"
, new String(returned.getMessage().getBody(), StandardCharsets.UTF_8)
, returned.getExchange(), returned.getReplyText(), returned.getRoutingKey());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 1.2 测试
- 正常情况下,交换机路由消息到队列
GET http://localhost:8080/confirm/sendMsg/消息1
日志如下:
2021-10-25 01:00:16.024 INFO 97463 --- [nio-8080-exec-1] t.b.r.controller.ProducerController : 发送消息内容为:消息1
2021-10-25 01:00:16.030 INFO 97463 --- [nectionFactory1] top.banner.rabbitmq.config.MyCallBack : 交换机已经收到ID为:1的消息
2021-10-25 01:00:16.033 INFO 97463 --- [ntContainer#0-1] t.b.rabbitmq.consumer.ConfirmConsumer : 接收到的队列confirm.queue消息:消息1
2
3
- 路由到队列失败,或者队列不存在(修改发送的时候的队列名称为
ConfirmConfig.CONFIRM_ROUTING_KEY + "1"
)
GET http://localhost:8080/confirm/sendMsg/消息1
日志如下:
2021-10-25 01:02:20.979 INFO 97511 --- [nio-8080-exec-1] t.b.r.controller.ProducerController : 发送消息内容为:消息1
2021-10-25 01:02:20.983 ERROR 97511 --- [nectionFactory1] top.banner.rabbitmq.config.MyCallBack : 消息:消息1,被交换机:confirm_exchange退回,退回原因:NO_ROUTE,路由Key:key11
2021-10-25 01:02:20.984 INFO 97511 --- [nectionFactory1] top.banner.rabbitmq.config.MyCallBack : 交换机已经收到ID为:1的消息
2
3
🤔 这里我们可以拿到被退回消息的所有信息,然后再进行处理,比如放到一个新的队列单独处理,路由失败一般都是配置问题了。
# 三、消费者无法正常消费
最后一步会出问题的地方就在消费者端了,不过这个解决问题的方法我们之前的文章已经说过了,就是消费者的消息确认。
打开手动消息确认之后,只要我们这条消息没有成功消费,无论中间是出现消费者宕机还是代码异常,只要连接断开之后这条信息还没有被消费那么这条消息就会被重新放入队列再次被消费。
当然这也可能会出现重复消费的情况,不过在分布式系统中幂等性是一定要做的,所以一般重复消费都会被接口的幂等给拦掉。
所谓幂等性就是:一个操作多次执行产生的结果与一次执行产生的结果一致。