
RabbitMQ交换机
消息路线:生产者 -> 交换机 -> 队列 -> 消费者
交换机接收生产者的消息,依据routingKey按照指定规则将消息转交给队列。
类型:
,direct
,topic
,headers
,fanout
默认交换机
即:
使用queueName代替routingKey,可以直接向队列中插入数据。
绑定关系
除了默认交换机,其他交换机都可将队列和RoutingKey进行绑定。
交换机通过RoutingKey来匹配队列,将消息投递。
1 | channel.queueBind(queueName,Exchange,RoutingKey) |
Fanout交换机
将接收到的所有消息广播。
队列与该交换机绑定不需要RoutingKey,
只要绑定了该交换机,该交换机收到的消息都会被传递给绑定的队列。
1 | /* |
Direct交换机
将接收到的消息,按照RoutingKey完全匹配的队列进行消息投递。
Topic交换机
支持RoutingKey模糊匹配。
Topic交换机的RoutingKey不能随意写,必须是:一个单词列表,用.
分割。
如:quick.orange.rabbit
其他规则:
*
可以代替一个单词#
可以代替0个或多个单词
若匹配
RoutingKey
使用#
,则匹配任意队列。
临时队列
可以指定让服务器创建一个临时队列,
- 临时队列是一个随机名称的队列,
- 一旦断开了消费者的连接,队列将被自动删除。
1 | String queueName = channel.queueDeclare().getQueue() |
这种队列有AD
标志。AD,即AutoDelete。
死信队列
死信,即无法被消费的信息。
当Consumer从Queue取出消息进行消费,但由于某些原因使得消费没有被消费,若没有后续的处理,这些消息就变成了死信。
使用RabbitMQ的死信队列机制,当消息发生异常,消息会进入到死信队列中。
死信的出现
- 消息TTL过期
- 队列达最大长度,多余添加到
- 消息被拒绝
- basic.reject或basci.nack并且requeue=false
流程
- 生产者生产消息
- 消息被拒绝/队列达最大长度/TTL过期
- 进入dead_exchange死信交换机
- 进入死信队列
使用
在需要进行死信处理的队列上添加死信arguments
1
2
3
4channel.queueDeclare(NORMAL_QUEUE,false,false,false,new HashMap<String,Object>(){{
put("x-dead-letter-exchange",DEAD_EXCHANGE); //死信提交的交换机
put("x-dead-letter-routing-key","lisi"); //死信
}});
实战1 消息TTL过期
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39public class Consumer {
//普通交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列名称
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = MQUtils.getChannel();
//声明交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//普通交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//死信交换机
//声明队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,new HashMap<String,Object>(){{
put("x-dead-letter-exchange",DEAD_EXCHANGE);//为普通队列添加死信队列
put("x-dead-letter-routing-key","lisi");//添加死信使用的key
}});
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//声明死信队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");//绑定队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println();
channel.basicConsume(NORMAL_QUEUE,true,(String consumerTag, Delivery message)->{
System.out.println("Consumer1接收到消息:"+new String(message.getBody(), StandardCharsets.UTF_8));
},(String consumerTag)->{
});
}
}生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtils.getChannel();
for (int i = 1; i < 11; i++) {
String message = "info"+i;
channel.basicPublish(
Consumer.NORMAL_EXCHANGE,
"zhangsan",
new AMQP.BasicProperties().builder().expiration("10000").build(),
message.getBytes());//生产信息,配置信息的过期时间为10000毫秒
}
}
}
实战2 消息队列溢出
在声明队列的时候,设置队列最大长度,就可实现消息队列消息溢出,消息溢出后即为死信;
若配置了死信队列,消息会被转移到死信队列中。
1 | channel.queueDeclare(NORMAL_QUEUE,false,false,false,new HashMap<String,Object>(){{ |
实战3 消息拒绝
对消息进行nack,即拒绝消息,并设置不自动回到队列,即可实现消息拒绝,消息会被定为死信。
1 | /* |
基于死信实现的延迟队列
为队列设置消息的ttl过期,即延时队列。
延时队列并非RabbitMQ原生队列,而是在基础上衍生出来的一种使用方式。
在RabbitMQ中为队列设置ttl,并设置死信队列,消费者在死信队列进行消费,即延时队列。
注意:RabbitMQ只会检测第一个消息的过期时间,不会检测后面的消息是否过期。
弥补:插件解决
使用场景
- 订单在十分钟内未支付则自动取消
- 新创建的店铺,若10天内没有上传商品,则自动发送消息提醒
- 用户注册成功,若3天内未登录自动提醒
真正的延时队列交换机
安装rabbitmq_delayed_message_exchange
插件实现
安装后会增加延时交换机类型x-delayed-message
。
- 使用该交换机的消息,可以增加
x-delay
属性来支持消息的延时传递,单位毫秒,若未设置x-delay
属性,消息会被立即传递。 - 该交换机遇上带有
x-delay
属性的消息后,会将该消息缓存,直到指定的时间结束后,消息会进入到指定的队列中。
使用
创建交换机
- 需要设交换机类型为
x-delayed-message
- 添加参数
x-delayed-type
为交换机类型,比如direct
1 | public Exchange exchange(){ |
向交换机发送消息
- 添加参数
x-delay
为要进行延时的长度,单位毫秒
1 | public void send3(String message,Integer delay){ |
原理
机制
安装插件后会生成新的Exchange类型x-delayed-message
,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia
(一个分布式数据系统)表中,并且当前节点是磁盘节点,那么节点重启后,消息还能保留。检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type
类型标记的交换机类型投递至目标队列。但是要注意的是,如果集群中只有一个磁盘节点,如果说磁盘节点丢失,或者节点上的插件失效。意味着消息将会丢失。
延迟队列总结
其他选择
- Java的
DelayQueue
Redis
的Zset
- Quartz
- 或Kafka的时间轮。