RabbitMQ 4
uwupu 啦啦啦啦啦

RabbitMQ持久化

队列持久化

介绍

使用

queueDeclare方法上将durable设为true来实现持久化。

1
2
channel.queueDeclare(MQUtils.KEY,true,false,true,null);
^

消息持久化

介绍

Hole,…

使用

修改basicPublish的参数props为MessageProperties.PERSISTENT_TEXT_PLAIN

1
channel.basicPublish("",MQUtils.KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,scanner.next().getBytes());

消息分发模式

注:该模式并非RabbitMQ原本的模式,而是依据RabbitMQ使用方式来衍生出来的使用策略。

默认为轮询模式。

  • 轮询模式:一个消费者一条,按均分配;
  • 公平分发:根据消费者消费能力进行公平分发,处理快的处理的多,处理慢的处理的少。

在Java中,通过设置是否进行自动应答和Qos来进行公平分发模式的实现。

轮询模式 Round-Robin

特点:当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直到消费者消费完成

若有2个消费者,1个生产者,不论两个消费者速度快慢,两个消费者收到的消息数量一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
channel.basicConsume("Queue1", true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到消息:" + new String( message.getBody()));
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接受失败。");
}
});

不公平分发

能者多劳。

在消费者的channel设置参数Qos为1,即可实现公平分发。

  • Qos为0则表示轮询模式。

channel.basicQos(1);

1
2
3
4
5
6
7
8
9
10
11
12
13
channel.basicQos(1);
channel.basicConsume("Queue1",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费成功:"+new String(body));
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.getChannel().basicAck(envelope.getDeliveryTag(),false);
}
});

预取值

prefetch

信道缓存区的大小,通过basic.qos来设置消费者“预取计数”值来完成;

定义通道上允许的未确认消息的最大数量,一旦达到配置的数量,RabbitMQ将停止在通道上传递更多消息,直到至少有1个未处理的消息被确认。

1
channel.basicQos(prefetchSize);

发布确认

介绍

  • 信道的confirm模式;

  • 在信道confirm模式下,生产者向Broker发送消息后,都会收到来自Broker的反馈;

    • 若生产者要求消息持久化,Broker会在将消息持久化之后向生产者发送反馈;
    • 原理:在confirm模式下,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列后,Broker会向生产者发送一条确认通知(包含消息的ID),这样生产者就会了解到消息是否成功送达队列。若消息是可持久化的,则确认通知会在消息写入磁盘后发出;
      • Broker的确认通知中delivery-tag域包含了消息是否已送达和消息的确认序列号;
        • 若RabbitMQ因内部原因导致消息丢失,就会发送一条nack消息表示消息出错。
      • Broker可以设置basic.ack的multiple域,表示这个序列号之前的所有消息都已经得到处理。
    • confirm是异步的,生产者在等待确认通知的同时也可以继续发送下一跳消息;生产者在收到nack消息后可以在回调方法中处理该nack消息。

开启confirm模式

在信道上使用confirmSelect()方法开启confirm模式。

channel.confirmSelect()

确认发布模式

单个确认发布

  • 一种简单的确认方式,它是一种同步确认发布的方式,

    • 发布一个消息后,只有这个消息被确认,后续的消息才能继续发布
    • waitForConfirmDie(long),在指定时间内收到确认则返回,若在时间范围内未确认则抛出异常。
  • 缺点

    • 发布速度特别慢
  • 场景:每秒不超过数百条发布消息吞吐量,有些应用程序是可以用的。

  • 使用:

1
2
3
4
5
6
7
8
9
10
11
12
//开启发布确认
channel.confirmSelect();
//批量发消息
for (int i = 0; i < MessageCount; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//单个消息,发布后马上确认
boolean b = channel.waitForConfirms();
if (b){
System.out.println(i+"消息发送成功");
}
}

批量

  • 先发一批消息,然后一起确认;
  • 优点:速度快
  • 缺点:不能确定是哪条消息出错
  • 使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
//开启发布确认
channel.confirmSelect();
//批量多少开确认一次
int batchSize = 100;
//批量发消息
for (int i = 0; i < MessageCount; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//到100条消息时候,批量确认一次
if ((1+i)%batchSize ==0){
channel.waitForConfirms();
}
}

异步

  • hole…

  • 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//消息监听器
channel.addConfirmListener((tag,multi)->{
//监听 成功 ack
System.out.println("确认成功的消息:"+tag);
},(tag,multi)->{
//监听 失败 nack
System.out.println("确认表示失败的消息:"+tag);
});

//批量发消息
for (int i = 0; i < MessageCount; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
}
 评论