05-延迟队列
# 一、什么是延迟队列
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
特殊的消息TTL过期死信队列。
# 二、RabbitMQ中的TTL(Time To Live)
TTL
是什么呢?TTL
是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间
,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。
那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
2
3
这样所有被投递到该队列的消息都最多不会存活超过6s。
另一种方式便是针对每条消息设置TTL,代码如下:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
2
3
4
这样这条消息的过期时间也被设置成了6s。
但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。
另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
# 三、使用RabbitMQ实现延迟队列
# 3.1 架构图
生产者发送一条消息,理由不用的RoutingKey,路由到延迟时间不同的队列中,每个队列都设置了不同的TTL属性,并绑定到同一个死信交换机,消息过期后,又被路由到死信队列中。消费者只要监听死信队列进行消费即可。
先声明交换机、队列以及他们的绑定关系
@Configuration
public class TtlQueueConfig {
//普通交换机名称
public static final String X_EXCHANGE = "X";
//死信交换机名称
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通队列的名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
//死信队列的名称
public static final String DEAD_LETTER_QUEUE = "QD";
/**
* 声明x交换机
*/
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
/**
* 声明x交换机
*/
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
/**
* 声明队列A
*/
@Bean("queueA")
public Queue queueA() {
return QueueBuilder.durable(QUEUE_A)
.deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey("YD")
.ttl(10000)
.build();
}
/**
* 声明队列B
*/
@Bean("queueB")
public Queue queueB() {
return QueueBuilder.durable(QUEUE_B)
.deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey("YD")
.ttl(40000)
.build();
}
/**
* 死信队列
*/
@Bean("queueD")
public Queue queueD() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
接下来创建消费者,对死信队列进行消费
@Slf4j
@Component
public class DeadLetterQueueConsumer {
/**
* 接收消息
*/
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("当前时间:{},收到死信队列的消息:{}", new Date(), msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
然后我们来创建生产者
@Slf4j
@RequestMapping("/ttl")
@RestController
public class SendMsgController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条时间给两个TTL队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列:" + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列:" + message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
打开RabbitMQ管理后台,我们可以看到刚刚创建的交换机和队列信息
- 交换机
- 队列
然后我们发送一条消息
GET http://localhost:8080/ttl/sendMsg/嘻嘻嘻
日志如下:
🤯 不过,使用这种方式的话,每增加一个新的时间需求,就要增加一个队列。如果某个预约提醒功能的场景,那么不是增加无数个队列才能满足需求。
# 四、延迟队列优化
显然,需要一种更通用的方案才能满足需求,那么就只能将TTL设置在消息属性里了。我们来试一试。
- 增加一个新的没有TTL的队列,并配置私信交换机Y
/**
* 声明队列C
*/
@Bean("queueC")
public Queue queueC() {
return QueueBuilder.durable(QUEUE_C)
.deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey("YD")
.build();
}
2
3
4
5
6
7
8
9
10
- 队列绑定交换机
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
2
3
4
5
6
- 生产者发送消息的时候指定消息过期时间(TTL)
@GetMapping("/sendExpirationMsg/{ttl}/{message}")
public void sendMsg(@PathVariable Long ttl, @PathVariable String message) {
log.info("当前时间:{},发送一条时长{}毫秒TTL消息给队列QC:{}", new Date(), ttl, message);
rabbitTemplate.convertAndSend("X", "XC", "消息来自ttl:" + ttl + "ms," + message, msg -> {
msg.getMessageProperties().setExpiration(ttl.toString());
return msg;
});
}
2
3
4
5
6
7
8
然后我们发送两条消息
GET http://localhost:8080/ttl/sendExpirationMsg/2000/第一条消息
GET http://localhost:8080/ttl/sendExpirationMsg/20000/第二条消息
日志如下:
将时间对调,先发送20s过期的消息,在发送2s过期的消息
GET http://localhost:8080/ttl/sendExpirationMsg/20000/第一条消息
GET http://localhost:8080/ttl/sendExpirationMsg/2000/第二条消息
日志如下:
🤯 看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置TTL的方式,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
🤯 该方法存在严重缺陷
# 五、RabbitMQ插件实现延迟队列
# 5.1 安装插件(abbitmq_delayed_message_exchange)
下载地址:https://www.rabbitmq.com/community-plugins.html (opens new window)
下载rabbitmq_delayed_message_exchange
插件,然后解压放置到RabbitMQ的插件目录(../plugins
)。
接下来,进入RabbitMQ的安装目录下的sbin目录,执行下面命令让该插件生效,然后重启RabbitMQ。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 5.2 安装插件(Docker版本)
# copy 插件到容器的插件目录
docker cp ~/Downloads/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez b7aaa6b4269b:/plugins
# 进入容器,启动插件
docker exec -it b7aaa6b4269b bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 查看启用的插件
rabbitmq-plugins list
# 退出容器并重启容器
exit
docker restart b7aaa6b4269b
2
3
4
5
6
7
8
9
10
11
12
13
安装成功后,添加交换机的时候增加了一个x-delayed-message类型
# 5.3 代码实现
较之前,现在的延迟,作用在交换机上。
- 声明交换机、队列、以及绑定关系
@Configuration
public class DelayedQueueConfig {
//队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//routingKey
public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";
/**
* 声明队列
*/
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
}
/**
* 声明交换机
*/
@Bean
public CustomExchange delayedExchange() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
/*
* name 交换机名
* type 交换机类型
* durable 是否持久化
* autoDelete 是否自动删除
* arguments 自定义参数
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}
@Bean
public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
- 延时队列消费者
@Slf4j
@Component
public class DelayedQueueConsumer {
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg);
}
}
2
3
4
5
6
7
8
9
10
- 消息生产者
@GetMapping("/sendDelayedMsg/{ttl}/{message}")
public void sendDelayedMsg(@PathVariable Integer ttl, @PathVariable String message) {
log.info("当前时间:{},发送一条时长{}毫秒消息给延迟队列delayed.queue:{}", new Date(), ttl, message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
msg.getMessageProperties().setDelay(ttl);
return msg;
});
}
2
3
4
5
6
7
8
发送消息:
GET http://localhost:8080/ttl/sendDelayedMsg/20000/第一条消息
GET http://localhost:8080/ttl/sendDelayedMsg/2000/第二条消息
日志如下:
# 六、🤯 总结
延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。