03-Exchange交换机
# Publis/Subscribe(发布订阅模式)
(using the Java Client)
向多个消费者传递同一条消息。这种模式被称为“发布/订阅”。
# 一、Exchange(交换机)
RabbitMQ 消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者经常甚至根本不知道消息是否会被传送到任何队列。
相反,生产者只能将消息发送到交换机(exchange)。交换机的工作非常简单。一方面它接收来自生产者的消息,另一方面将它们推送到队列中。交换机必须确切地知道如何处理收到的消息。。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
有几种可用的交换类型:直接(direct
)、主题(topic
) 、标题(headers
) 、扇出(fanout
)
channel.basicPublish("", "hello", null, message.getBytes());
- 第一个参数是交换机名称,空字符串表示默认交换机。
# 1.1 临时队列(没有持久化的队列)
每当我们连接到Rabbit时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。
创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue();
创建出来之后长成这样:
# 1.2 绑定(bingding)
binding其实是exchange和queue之间的桥梁,它告诉我们exchange和那个队列进行了绑定关系
发送消息时,根据生产者的要求,交换机根据RoutingKey将消息路由给对应的队列。
# 二、Fanout(广播、扇出)交换机
Fanout 是将接收到的所有消息广播到它知道的所有队列中。
🤓 交换机和两个队列的关系如下:
- 接收者1接收消息
public class ReceiveLogs01 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//声明一个临时队列
String queueName = channel.queueDeclare().getQueue();
//将队列绑定到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("ReceiveLogs01 等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
- 接受者2接收消息
public class ReceiveLogs02 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//声明一个临时队列
String queueName = channel.queueDeclare().getQueue();
//将队列绑定到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("ReceiveLogs02 等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
- 发送日志
public class EmitLog {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
System.out.println("等待生产者发送消息......");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息:" + message);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 三、Direct (直接、路由)交换机
Bingding绑定的时候routingKey不同,及路由模式
根据绑定的routingKey不同,将消费路由到不同的队列中。
🤯 上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。
两个队列console、disk分别绑定direct_logs交换机,但是他们对应的routingKey分别为info、warning、error。
- 接收者1,队列console,多重绑定routingKey info和warning
public class ReceiveLogsDirect01 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//申明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//申明一个队列
channel.queueDeclare("console", false, false, false, null);
//队列绑定交换机
channel.queueBind("console", EXCHANGE_NAME, "info");
channel.queueBind("console", EXCHANGE_NAME, "warning");
System.out.println("ReceiveLogsDirect01 等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogsDirect01 " + message.getEnvelope().getRoutingKey() + "收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume("console", true, deliverCallback, consumerTag -> {
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
🧐 多重绑定
//队列绑定交换机
channel.queueBind("console", EXCHANGE_NAME, "info");
channel.queueBind("console", EXCHANGE_NAME, "warning");
2
3
- 接收者2,队列disk,绑定routingKey error
public class ReceiveLogsDirect02 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//申明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//申明一个队列
channel.queueDeclare("disk", false, false, false, null);
//队列绑定交换机
channel.queueBind("disk", EXCHANGE_NAME, "error");
System.out.println("ReceiveLogsDirect02 等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogsDirect02 " + message.getEnvelope().getRoutingKey() + "收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume("disk", true, deliverCallback, consumerTag -> {
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
- 生产者发送日志,根据routingKey不同发送消息
public class DirectLogs {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
System.out.println("DirectLogs 等待生产者发送消息......");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
if (message.startsWith("i")) {
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes(StandardCharsets.UTF_8));
} else if (message.startsWith("w")) {
channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes(StandardCharsets.UTF_8));
} else {
channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes(StandardCharsets.UTF_8));
}
System.out.println("生产者发出消息:" + message);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
- i开头的消息发到routingKey为info的队列中
- w开头的消息发到routingKey为warning的队列中
- e开头的消息发到routingKey为error的队列中
# 四、Topic(主题)交换机
发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过255个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像fanout了
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了
2
3
4
- 消费者1,routingKey为
*.orange.*
public class ReceiveLogsTopic01 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);
//绑定交换机
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println("C1等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
System.out.println("接收队列:" + queueName + " 绑定建:" + message.getEnvelope().getRoutingKey());
};
//接收消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- 消费者2,routingKey为
*.*.rabbit
,lazy.#
public class ReceiveLogsTopic02 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);
//绑定交换机
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println("C2等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
System.out.println("接收队列:" + queueName + " 绑定建:" + message.getEnvelope().getRoutingKey());
};
//接收消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- 生产者
public class EmitLogTopic {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/*
* Q1-->绑定的是中间带orange带3个单词的字符串(*.orange.*)
* Q2-->绑定的是最后一个单词是rabbit的3个单词(*.*.rabbit)
* 第一个单词是lazy的多个单词(lazy.#) *
*/
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit", "被队列Q1Q2接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到");
bindingKeyMap.put("quick.orange.fox", "被队列Q1接收到");
bindingKeyMap.put("lazy.brown.fox", "被队列Q2接收到");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列Q2接收一次");
bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息" + message);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31