02-轮询分发以及消息应答
# 一、Work Queues(竞争消费者模式)
(using the Java Client)
# 一、轮询分发消息
//得到一个连接的channel
public static Channel getChannel() throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
return connection.createChannel();
}
2
3
4
5
6
7
8
9
10
# 1.1 启动两个工作线程准备接收消息
/**
* 工作线程
*/
public class Worker01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:" + receivedMessage);
};
CancelCallback cancelCallback = (consumerTag) -> System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
System.out.println("C2消费者启动等待消费......");
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 1.2 启动一个发送线程
public class Task01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//从控制台当中接受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成:" + message);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 二、消息应答
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。
消息应答是默认打开的。我们通过显式的设置autoAsk=true关闭这种机制。一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
# 2.1 自动应答
消息发送后立即被认为已经消费成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
# 2.2 手动应答
消费者处理完业务逻辑,手动返回ack(通知)告诉队列处理完了,队列进而删除消息。手动应答的好处是可以批量应答并且减少网络拥堵
# 2.3 消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将知道消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
# 🤔 测试手动应答
☝️创建新的生产者,创建新的队列,并发送消息
public class Task02 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
final Channel channel = RabbitMqUtils.getChannel();
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next() + " " + new Date();
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息完成:" + message);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
✌️创建两个消费者 C1 和 C2
C1和C2的区别在于,C1可以快速处理消息,C2需要较长的时间才能处理消息
public class Work03 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1 等待接收消息处理时间较短");
//消息消费的时候如何处理消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
SleepUtils.sleep(1);
System.out.println(new Date() + " 接收到消息:" + message);
/*
* 1.消息标记 tag
* 2.是否批量应答未应答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> System.out.println(consumerTag + "消费者取消消费接口回调逻辑"));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Work04 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2 等待接收消息处理时间较长");
//消息消费的时候如何处理消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
SleepUtils.sleep(15);
System.out.println(new Date() + " 接收到消息:" + message);
/*
* 1.消息标记 tag
* 2.是否批量应答未应答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> System.out.println(consumerTag + "消费者取消消费接口回调逻辑"));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
🤟 观察结果
- 正常情况
- C2突然挂了,消息重新回到队列,被C1消费
# 三、RabbitMQ持久化
# 3.1 队列持久化
之前我们创建的队列都是非持久化的,如果RabbitMQ重启,队列就会被删除。
通过声明队列的时候将durable
参数设置为true
使队列变成持久化状态
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
2
⚠️如果之前声明的队列不是持久化的,需要将原先的队列删除,或者重新创建一个持久化队列,不然会报错。
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
持久化后,标志如下:
# 3.2 消息持久化
发送消息时设置BasicProperties
属性
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
⚠️该方式并不能完全保证消息不会丢失。当消息刚准备存到磁盘的时候,还没有完全储存完,消息还在缓存的一个时间间隔。此时重启可能会丢失消息。
# 3.3 不公平分发
之前我们接触到的分发方式都是轮询分发,但是在两个消费者处理消息能力不同的情况下,我们应该让处理快的消费者更多的去消费消息。
消费者通过设置参数channel.basicQos(1);
实现
意思就是如果这个任务我还没有处理完,你先别分配给我,我目前只能处理一个任务,然后RabbitMQ就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的worker或者改变其他存储任务的策略。
# 3.4 预读值
消费者消息预读取是一个更加合理和高效的限制未确认消息数量的解决方式。
AMQP 0-9-1协议中定义了basic.qos
方法用于限制信道或者连接上的未确认消息数量,这个消息数据量命名为prefetch_count
。不幸的是,信道其实并不是限制未确认消息数量的理想范畴,因为单个信道有可能有多个消费者订阅多个不同的队列,所以信道和队列需要为发送的每个消息相互协调,以确保消息总数量不超过限制,造成了性能下降,单机性能出现瓶颈,在集群方案中耗时更加严重。
basic.qos
定义了两个属性:
- prefetch_count:预读取消息的数量。
- global:是否全局的。
# 四、发布确认
# 4.1 发布确认原理
发布者确认解决了消息发送传输过程中因网络故章导致丢失消息的问题,RabbitMQ 为消息者确认提供了 自动确认 和 手动确认 两种方式。
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
在默认情况下,发布者在将消息发送出去之后,broker 是不会返回任何消息给生产者的,即发布者不知道消息是否正确到达 broker。发布者发布消息,可能会因为网络故障或不稳定导致消息丢失,或者到达服务器大大延迟。RabbitMQ 为解决此问题提供了两种方式:一是基于 AMQP 协议中的事务机制;二是把信道设置成确认模式。
⚠️ 前提:
- 队列必须持久化
- 消息必须持久化
# 4.2 发布确认策略
发布确认默认是关闭的
channel.confirmSelect();//开启发布确认发布
# 单个确认发布
/**
* 单个确认
*/
private static void publishMessageIndividually() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启确认
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服务端返回false或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
发布消息后只有它被确认发布,后续消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
🧐 优点:简单。
🧐 缺点:发布速度特别慢,每秒不超过数百条发布消息的吞吐量。
# 批量确认发布
/**
* 批量确认
*/
public static void publishMessageBatch() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
//批量确认消息大小
int batchSize = 100;
//未确认消息个数
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
🧐 优点:较单个确认发布,极大地提高了吞吐量。
🧐 缺点:当发生故障时导致发布出现问题时,无法知道是哪个消息出现了问题。
# 异步确认发布
/**
* 异步批量确认
*/
public static void publishMessageAsync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
//开启一个线程安全,有序的哈希表,用来存放消息标记对应的消息
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
/*
* 消息确认成功的回调
* deliveryTag 消息标记
* multiple 是否是批量
*/
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if (multiple) {
// 返回从头到toKey的子Map
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
confirmed.clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
System.out.println("确认的消息:" + deliveryTag);
};
//消息确认失败的回调
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息tag:" + deliveryTag + " --- message:" + message);
};
//要提前准备监听器
channel.addConfirmListener(ackCallback, nackCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
//将标记和对应的消息记录下来,key->在确认模式下,返回要发布的下一条消息的序列号。
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批异步确认消息,耗时" + (end - begin) + "ms");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
🧐 优点:可靠性和效率都最优
🧐 缺点:比同步确认发布实现较复杂