007-消息队列-RocketMQ
banner 2022-03-14 22:01:13 微服务SpringCloudSpringCloudAlibabaRocketMQ
# 一、Spring 实现异步的方法
- AsyncRestTemplate
- @Async
- WebClient(Spring5.0引入)
- MQ
# 二、MQ的使用场景
- 异步处理
- 削峰填谷-秒杀
- 解耦微服务
# 三、搭建RocketMQ
下载rocketmq-all-4.5.1-bin-release
启动Name Server
nohup sh bin/mqnamesrv &
1
tail -f ~/logs/rocketmqlogs/namesrv.log
1
启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
1
tail -f ~/logs/rocketmqlogs/broker.log
1
关闭MQ
sh bin/mqshutdown broker
1
sh bin/mqshutdown namesrv
1
# 四、SpringBoot接入RocketMQ
# 4.1 导入jar
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
1
2
3
4
5
2
3
4
5
# 4.2 增加配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
# 必须要要的group
group: test-group
1
2
3
4
5
2
3
4
5
# 4.3 内容中心-生产者
rocketMQTemplate.convertAndSend("add-bonus",
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build()
);
1
2
3
4
5
6
7
2
3
4
5
6
7
# 4.4 用户中心-消费者
@Service
@RocketMQMessageListener(consumerGroup = "test-group", topic = "add-bonus")
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {
@Override
public void onMessage(UserAddBonusMsgDTO message) {
// 当收到消息后,执行业务
}
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 五、RocketMQ事务消息
# 5.1 实现事务消息流程
# 5.2 概念
- 半消息
- 暂时无法消费的消息,生产者将消息发送到了MQ,但是这个消息会被标记为
暂时不能投递
,先储存起来,消费者不会去消费这条消息
- 暂时无法消费的消息,生产者将消息发送到了MQ,但是这个消息会被标记为
- 消息回查
- 网络断开或者生产者重启可能会导致丢失事务消息的第二次确认。
# 5.3 事务消息的三种状态
- Commit:提交事务消息,消费者可以消费此消息。
- Rollback:回滚事务消息,broker会删除该消息,消费者不能消费。
- UNKNOWN:broker需要回查确认该消息的状态。
# 5.4 编码实现事务消息
增加一个事务日志表
创建一个 AddBonusTransactionListener
事务消息监听,实现 RocketMQLocalTransactionListener
,重写 executeLocalTransaction
(执行本地事务)和 checkLocalTransaction
(本地事务确定)
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
@Resource
private ShareService shareService;
@Resource
private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.valueOf((String) headers.getOrDefault("share_id", 0));
try {
//该方法会修改数据并记录消息的事务记录,方便本地事务进行确认
shareService.auditByIdWithRocketMQLog(shareId, (ShareAuditDTO) arg, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 本地事务确认
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
//获取消息的transactionId
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
//检查表中是否存在该消息的事务日志
RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId).build()
);
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
修改发送消息的代码
String transactionId = UUID.randomUUID().toString();
rocketMQTemplate.sendMessageInTransaction(
"tx-add-bonus-group",
"add-bonus",
MessageBuilder.withPayload(UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build())
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", id)
.build(),
// arg 参数会传递给 executeLocalTransaction方法的 arg
auditDTO
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
# 六、Spring Cloud Stream
# 6.1 生产者如何接入
# 6.1.1 导入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
1
2
3
4
2
3
4
# 6.1.2 增加注解
@EnableBinding(Source.class)
1
# 6.1.3 编写配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
# 用来指定topic
destination: stream-test-topic
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
🧐 这里要注意,接入后,nacos心跳的日志会不停输出,可以设置只输入error
级别的日志。
logging:
level:
com.alibaba.nacos: error
1
2
3
2
3
# 6.1.4 编写生产者
@Resource
private Source source;
@GetMapping("/test-stream")
public String testStream() {
source.output()
.send(
MessageBuilder.withPayload("消息体").build()
);
return "success";
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 6.2 消费者如何接入
# 6.2.1 导入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
1
2
3
4
2
3
4
# 6.2.2 增加注解
@EnableBinding(Sink.class)
1
# 6.2.3 编写配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input:
destination: stream-test-topic
# 只有rocketMQ 必须设置
group: binder-group
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 6.2.4 编写消费者
@Service
@Slf4j
public class TestStreamConsumer {
@StreamListener(Sink.INPUT)
public void receive(String messageBody) {
log.info("通过stream收到了消息:messageBody = {}", messageBody);
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 6.3 自定义接口实现消息的收发
# 6.3.1 自定义发消息接口
基于[6.1 生产者如何接入](#6.1 生产者如何接入)修改
# 定义一个接口
public interface MySource {
String MY_OUTPUT = "my-output";
@Output(MY_OUTPUT)
MessageChannel output();
}
1
2
3
4
5
6
2
3
4
5
6
# 启动类修改注解
@EnableBinding({Source.class, MySource.class})
1
# 修改配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
# 用来指定topic
destination: stream-test-topic
my-output:
destination: stream-my-topic
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
这里要增加的my-output
要和接口中的@Output
中的内容一致
# 发消息
@Resource
private MySource mySource;
@GetMapping("/test-stream-2")
public String testStream2() {
mySource.output()
.send(
MessageBuilder.withPayload("消息体").build()
);
return "success";
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 6.3.2 自定义收消息接口
基于[6.2 消费者如何接入](#6.2 消费者如何接入)修改
# 定义一个接口
public interface MySink {
String MY_INPUT = "my-input";
@Input(MY_INPUT)
SubscribableChannel input();
}
1
2
3
4
5
6
2
3
4
5
6
# 启动类修改注解
@EnableBinding({Sink.class, MySink.class})
1
# 修改配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input:
destination: stream-test-topic
# 只有rocketMQ 必须设置
group: binder-group
my-input:
destination: stream-my-topic
group: my-group
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
这里要增加的my-input
要和接口中的@Input
中的内容一致
# 接收消息
@Service
@Slf4j
public class MyTestStreamConsumer {
@StreamListener(MySink.MY_INPUT)
public void receive(String messageBody) {
log.info("自定义接口消费,通过stream收到了消息:messageBody = {}", messageBody);
}
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 6.4 补充
Source
Sink
Processor
# 6.5 Stream 监控端点
/actuator/bindings
/actuator/channels
/actuator/health