RabbitMQ 6 交换机确认回调 消息回退 备份交换机 幂等性
uwupu 啦啦啦啦啦

发布确认回调

通过实现RabbitTemplate.ConfirmCallback接口,可以实现当消息向交换机传递失败时,触发回调方法。

配置文件

1
spring.rabbitmq.publisher-confirm-type=correlated
  • 该配置用于设置消息从程序传递到交换机,交换机是否响应结果
  • 参数
    • CORRELATED:关联模式,消息发送到交换机会触发回调方法
    • NONE:禁用发布确认,是默认模式。
    • SIMPLE:
      • 像CORRELATED一样会触发回调方法
      • 发布消息成功后使用rabbitTemplate调用waitForConfirmswaitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;

CORRELATED

发布消息

1
2
template.convertAndSend(RabbitMQConfig.EXCHANGE,RabbitMQConfig.QUEUE_KEY,m,new CorrelationData("这是ID"));
//new CorrelationData("这是ID"),这个参数会在回调函数中获得,一般用于在回调函数中区分被回调的消息。

接收回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class FailedMessageConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
RabbitTemplate template;

@PostConstruct
public void init(){
template.setConfirmCallback(this);
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String m = "";
if (correlationData!=null)m = correlationData.getId();//这里获取消息的ID
if (ack){
System.out.println("成功发送消息:"+m);
}else {
System.out.println("发送消息失败,原因:"+cause);
}
}
}
1
发送消息失败,原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange_study_2131q' in vhost '/', class-id=60, method-id=40)

消息回退

  • 当仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现消息该消息不可路由,消息会被直接丢弃。

  • 解决方法

    • 设置mandatory参数,当消息不可到达目的地时将消息返回给生产者。

配置

1
2
#当消息不可被路由,回退消息给生产者
spring.rabbitmq.publisher-returns=true

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class FailedMessageConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
RabbitTemplate template;

@PostConstruct
public void init(){
template.setReturnsCallback(this);
}

//可以当消息传递过程中,消息不可达目的地,则返回给生产者
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.printf("消息%s,被交换机%s退回,退回原因%s,路由key:%s",
returned.getMessage(),
returned.getExchange(),
returned.getReplyText(),
returned.getRoutingKey());
}
}
1
消息 你好 ,被交换机 exchange_study_2131 退回,退回原因 NO_ROUTE ,路由key: queue_key_study_2131qwe

备份交换机

当交换机无法将消息传递给队列,会将消息传递给备份交换机

可以在交换机中配置参数:alternate-exchange值为备份交换机的名字。

1
2
3
4
@Bean
public Exchange directExchange2131(){
return ExchangeBuilder.directExchange(EXCHANGE).withArgument("alternate-exchange",BACKUP_EXCHANGE).build();
}

消息回退与备份交换机

消息回退备份交换机同时开启时,备份交换机优先级更高

死信队列和备份交换机的区别

使用

  • 死信队列:x-dead-letter-exchange设置死信交换机,x-dead-letter-routing-key设置使用的路由key。

  • 备份交换机:alternate-exchange指定备份交换机的名字。

场景

  • 死信队列:负责处理交换机的消息无法到达队列的情况,消息被拒绝/队列达最大长度/TTL过期
  • 备份交换机:当消息无法到达交换机时,消息将尝试到达备份交换机。

image

其他

幂等性

介绍

  • 要求用户对于同一操作发起的一次或多次请求的结果是一致的,不会因多次请求产生副作用。

  • 解决的问题如:支付,用户购买商品支付,扣款成功,但网络异常,导致二次扣款

如何出现

  • 消费者在MQ消费消息,

  • 当MQ向消费者发送消息,消费者要返回ACK时,网络中断,

  • 此时MQ没有收到确认消息,

    • 这个消息会被发送给其他消费者,
    • 或在网络恢复后将消息重新发给消费者
  • 产生重复消费

解决思路

  • 为消息标记一个全局ID或一个唯一标识,每次消费消息前判断该消息是否被消费过。

主流解决方案

  • 唯一ID+指纹码机制
    • 利用数据库主键去重
    • 指纹码:为消息添加唯一标志。
    • 优势:简单
    • 劣势:在高并发时,会出现性能问题。
  • 分布式锁,如利用Redis原子性实现(推荐的方式)
    • 使用Redis的setnx命令。
 评论