RabbitMQ 5 交换机 死信队列 延时队列
uwupu 啦啦啦啦啦

RabbitMQ交换机

消息路线:生产者 -> 交换机 -> 队列 -> 消费者

交换机接收生产者的消息,依据routingKey按照指定规则将消息转交给队列。

类型: directtopicheadersfanout

默认交换机

即:

使用queueName代替routingKey,可以直接向队列中插入数据。

绑定关系

除了默认交换机,其他交换机都可将队列RoutingKey进行绑定。

交换机通过RoutingKey来匹配队列,将消息投递。

1
channel.queueBind(queueName,Exchange,RoutingKey)

Fanout交换机

将接收到的所有消息广播

队列与该交换机绑定不需要RoutingKey,

只要绑定了该交换机,该交换机收到的消息都会被传递给绑定的队列。

1
2
3
4
5
6
7
8
/*
* exchangeName:交换机的名字
* typeName:交换机类型,fanout等
* public enum BuiltinExchangeType {
* DIRECT("direct"),FANOUT("fanout"),TOPIC("topic"), * HEADERS("headers");
* }
*/
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT);

Direct交换机

将接收到的消息,按照RoutingKey完全匹配的队列进行消息投递。

Topic交换机

支持RoutingKey模糊匹配。

Topic交换机的RoutingKey不能随意写,必须是:一个单词列表,用.分割。

如:quick.orange.rabbit

  • 其他规则:

    • *可以代替一个单词

    • #可以代替0个或多个单词

  • 若匹配RoutingKey使用#,则匹配任意队列。

临时队列

可以指定让服务器创建一个临时队列,

  • 临时队列是一个随机名称的队列,
  • 一旦断开了消费者的连接,队列将被自动删除。
1
String queueName = channel.queueDeclare().getQueue()

这种队列有AD标志。AD,即AutoDelete。

死信队列

死信,即无法被消费的信息。

当Consumer从Queue取出消息进行消费,但由于某些原因使得消费没有被消费,若没有后续的处理,这些消息就变成了死信

使用RabbitMQ的死信队列机制,当消息发生异常,消息会进入到死信队列中。

死信的出现

  • 消息TTL过期
  • 队列达最大长度,多余添加到
  • 消息被拒绝
    • basic.reject或basci.nack并且requeue=false

流程

  1. 生产者生产消息
  2. 消息被拒绝/队列达最大长度/TTL过期
  3. 进入dead_exchange死信交换机
  4. 进入死信队列

使用

  • 在需要进行死信处理的队列上添加死信arguments

    1
    2
    3
    4
    channel.queueDeclare(NORMAL_QUEUE,false,false,false,new HashMap<String,Object>(){{
    put("x-dead-letter-exchange",DEAD_EXCHANGE); //死信提交的交换机
    put("x-dead-letter-routing-key","lisi"); //死信
    }});

实战1 消息TTL过期

  • 消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    public class Consumer {

    //普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列名称
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列
    public static final String DEAD_QUEUE = "dead_queue";


    public static void main(String[] args) throws Exception{

    Channel channel = MQUtils.getChannel();

    //声明交换机
    channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//普通交换机
    channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//死信交换机
    //声明队列
    channel.queueDeclare(NORMAL_QUEUE,false,false,false,new HashMap<String,Object>(){{
    put("x-dead-letter-exchange",DEAD_EXCHANGE);//为普通队列添加死信队列
    put("x-dead-letter-routing-key","lisi");//添加死信使用的key
    }});
    channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//声明死信队列
    channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");//绑定队列
    channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
    System.out.println();

    channel.basicConsume(NORMAL_QUEUE,true,(String consumerTag, Delivery message)->{
    System.out.println("Consumer1接收到消息:"+new String(message.getBody(), StandardCharsets.UTF_8));
    },(String consumerTag)->{

    });


    }

    }
  • 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15

    public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
    Channel channel = MQUtils.getChannel();
    for (int i = 1; i < 11; i++) {
    String message = "info"+i;
    channel.basicPublish(
    Consumer.NORMAL_EXCHANGE,
    "zhangsan",
    new AMQP.BasicProperties().builder().expiration("10000").build(),
    message.getBytes());//生产信息,配置信息的过期时间为10000毫秒
    }
    }
    }

实战2 消息队列溢出

在声明队列的时候,设置队列最大长度,就可实现消息队列消息溢出,消息溢出后即为死信;

若配置了死信队列,消息会被转移到死信队列中。

1
2
3
4
5
channel.queueDeclare(NORMAL_QUEUE,false,false,false,new HashMap<String,Object>(){{
put("x-dead-letter-exchange",DEAD_EXCHANGE);
put("x-dead-letter-routing-key","lisi");
put("x-max-length",6);//设置队列最大长度
}});

实战3 消息拒绝

对消息进行nack,即拒绝消息,并设置不自动回到队列,即可实现消息拒绝,消息会被定为死信。

1
2
3
4
5
6
7
/*
p1:消息的Tag
p2:是否批量
p3:是否回到队列,这里为false。
*/
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,false);

基于死信实现的延迟队列

为队列设置消息的ttl过期,即延时队列。

延时队列并非RabbitMQ原生队列,而是在基础上衍生出来的一种使用方式。

在RabbitMQ中为队列设置ttl,并设置死信队列,消费者在死信队列进行消费,即延时队列

注意:RabbitMQ只会检测第一个消息的过期时间,不会检测后面的消息是否过期。

弥补:插件解决

使用场景

  1. 订单在十分钟内未支付则自动取消
  2. 新创建的店铺,若10天内没有上传商品,则自动发送消息提醒
  3. 用户注册成功,若3天内未登录自动提醒

真正的延时队列交换机

安装rabbitmq_delayed_message_exchange插件实现

安装后会增加延时交换机类型x-delayed-message

  • 使用该交换机的消息,可以增加x-delay属性来支持消息的延时传递,单位毫秒,若未设置x-delay属性,消息会被立即传递。
  • 该交换机遇上带有x-delay属性的消息后,会将该消息缓存,直到指定的时间结束后,消息会进入到指定的队列中。

使用

创建交换机

  • 需要设交换机类型为x-delayed-message
  • 添加参数x-delayed-type为交换机类型,比如direct
1
2
3
public Exchange exchange(){
return new ExchangeBuilder(DELAY_EXCHANGE,"x-delayed-message").withArgument("x-delayed-type","direct").build();
}

向交换机发送消息

  • 添加参数x-delay为要进行延时的长度,单位毫秒
1
2
3
4
5
6
public void send3(String message,Integer delay){
template.convertAndSend("delay_exchange_study","delay_message_queue_key",message,(msg)->{
msg.getMessageProperties().setDelay(delay);
return msg;
});
}

原理

机制

安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,并且当前节点是磁盘节点,那么节点重启后,消息还能保留。检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。但是要注意的是,如果集群中只有一个磁盘节点,如果说磁盘节点丢失,或者节点上的插件失效。意味着消息将会丢失。

延迟队列总结

其他选择

  • Java的DelayQueue
  • RedisZset
  • Quartz
  • 或Kafka的时间轮。
 评论