RabbitMQ 4

RabbitMQ持久化
队列持久化
介绍
使用
在queueDeclare
方法上将durable设为true来实现持久化。
1 | channel.queueDeclare(MQUtils.KEY,true,false,true,null); |
消息持久化
介绍
Hole,…
使用
修改basicPublish
的参数props为MessageProperties.PERSISTENT_TEXT_PLAIN
。
1 | channel.basicPublish("",MQUtils.KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,scanner.next().getBytes()); |
消息分发模式
注:该模式并非RabbitMQ原本的模式,而是依据RabbitMQ使用方式来衍生出来的使用策略。
默认为轮询模式。
- 轮询模式:一个消费者一条,按均分配;
- 公平分发:根据消费者消费能力进行公平分发,处理快的处理的多,处理慢的处理的少。
在Java中,通过设置是否进行自动应答和Qos来进行公平分发模式的实现。
轮询模式 Round-Robin
特点:当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直到消费者消费完成
若有2个消费者,1个生产者,不论两个消费者速度快慢,两个消费者收到的消息数量一致。
1 | channel.basicConsume("Queue1", true, new DeliverCallback() { |
不公平分发
能者多劳。
在消费者的channel设置参数Qos为1,即可实现公平分发。
- Qos为0则表示轮询模式。
channel.basicQos(1);
1 | channel.basicQos(1); |
预取值
prefetch
信道缓存区的大小,通过basic.qos来设置消费者“预取计数”值来完成;
定义通道上允许的未确认消息的最大数量,一旦达到配置的数量,RabbitMQ将停止在通道上传递更多消息,直到至少有1个未处理的消息被确认。
1 | channel.basicQos(prefetchSize); |
发布确认
介绍
信道的confirm模式;
在信道confirm模式下,生产者向Broker发送消息后,都会收到来自Broker的反馈;
- 若生产者要求消息持久化,Broker会在将消息持久化之后向生产者发送反馈;
- 原理:在confirm模式下,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列后,Broker会向生产者发送一条确认通知(包含消息的ID),这样生产者就会了解到消息是否成功送达队列。若消息是可持久化的,则确认通知会在消息写入磁盘后发出;
- Broker的确认通知中delivery-tag域包含了消息是否已送达和消息的确认序列号;
- 若RabbitMQ因内部原因导致消息丢失,就会发送一条nack消息表示消息出错。
- Broker可以设置basic.ack的multiple域,表示这个序列号之前的所有消息都已经得到处理。
- Broker的确认通知中delivery-tag域包含了消息是否已送达和消息的确认序列号;
- confirm是异步的,生产者在等待确认通知的同时也可以继续发送下一跳消息;生产者在收到nack消息后可以在回调方法中处理该nack消息。
开启confirm模式
在信道上使用confirmSelect()
方法开启confirm模式。
channel.confirmSelect()
确认发布模式
单个确认发布
一种简单的确认方式,它是一种同步确认发布的方式,
- 发布一个消息后,只有这个消息被确认,后续的消息才能继续发布
- waitForConfirmDie(long),在指定时间内收到确认则返回,若在时间范围内未确认则抛出异常。
缺点
- 发布速度特别慢
场景:每秒不超过数百条发布消息吞吐量,有些应用程序是可以用的。
使用:
1 | //开启发布确认 |
批量
- 先发一批消息,然后一起确认;
- 优点:速度快
- 缺点:不能确定是哪条消息出错
- 使用:
1 | //开启发布确认 |
异步
hole…
使用
1 | //消息监听器 |
评论