RabbitMQ 3 消息模式 使用场景
uwupu 啦啦啦啦啦

消息模式

Simple,Work Queue,Publish/Subscribe,Routing,Topics,RPC;

投递消息过程中,若没有指定交换机,则使用默认的交换机。

Simple简单模式

一个生产者,一个消费者

image

Work Queue工作队列

一个生产者,多个消费者,一个消息只能消费一次。

image

Publish/Subscribe

发布订阅模式。使用fanout交换机。

生产者首先投递消息到交换机,订阅了这个交换机的所有队列就会收到生产者投递的消息。

image

Routing路由模式

使用direct交换机

生产者生产消息投递到direct交换机中,交换机根据消息携带的RoutingKey匹配相应的队列

  • 比如有三个队列:Q1,Q2,Q3;
  • 在direct交换机中,指定
    • Q1,mail
    • Q2,phone
    • Q3,Wechat
    • Q2,Wechat
  • 使用该交换机发送消息的时候,会向指定Key对应的队列中发布数据。
    • 比如RoutingKey为Wechat时,发布的数据会被发送到Q3和Q2.

image

Topics

  • 生产者将消息投递到topic交换机,该交换机支持根据RoutingKey对队列进行模糊匹配

  • 发送到Topics消息的RoutingKey不能随意写,必须是一个单词列表,每个单词用.分隔开;

    • 如:

      • com.yn.ZhangSan

      • asd.qwe.zxc

      • 其中有两个替换符

        • #:可以替代0个或多个单词

        • *:可以替代一个单词

同时根据header和RoutingKey进行匹配;

有两种类型

  • all:header头必须完全匹配
  • any:在Queue的所有键值对在消息的Headers中都能找到,就可匹配成功。

Header为一个key-value键值对,默认情况下headers为any类型

声明Exchange和Queue,Java示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

public class Producer {

public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory(){{
setHost("192.168.227.131");
setPort(5672);
setUsername("admin");
setPassword("admin123");
setVirtualHost("/");//
}};

Connection connection = null;
Channel channel = null;
try {
//2. 获取连接
connection = connectionFactory.newConnection("Test生产者");
//3. 通过连接获取通道Channel
channel = connection.createChannel();

String queueName = "Queue2";

//声明队列
/**
* @params1 队列名称
* @params2 是否要持久化durable
* @params3 排他性,是否独占
* @params4 是否自动删除,即随着最后一个消费者消息完毕后是否把队列删除
* @params5 携带附属参数
*/
channel.queueDeclare("Q1",false,false,false,null);
channel.queueDeclare("Q2",false,false,false,null);
channel.queueDeclare("Q3",false,false,false,null);


channel.exchangeDeclare("Exchange1","direct");
channel.queueBind("Q1","Exchange1","order");
channel.queueBind("Q2","Exchange1","apple");
channel.queueBind("Q3","Exchange1","pear");

//准备消息内容
String exchange = "Exchange1";
String routeKey = "apple";
String message = "Hello declare";
/**
* @params1 交换机,消息订阅类型
* @params2 RoutingKey
* @params3 是否持久化,后续详讲
* @params4 信息的二进制数据
*/
channel.basicPublish(exchange,routeKey,null,message.getBytes());

System.out.println("消息发送成功");

}catch (Exception e){
e.printStackTrace();
}finally {
//关闭通道
if (channel!=null && channel.isOpen()){
channel.close();
}
//关闭连接
if (connection!=null && connection.isOpen()){
connection.close();
}
}

}
}

RabbitMQ消息应答

类型

自动应答

只要消费者从队列中获取了消息,无论是否消费成功,都认为消息已被消费,队列会把该消息数据删除。

手动应答

正常应答

  • Channel.basicAck 肯定确认
    • RabbitMQ了解到消息成功处理,将消息丢弃
  • Channel.basicNack 否定确认
  • Channel.basicReject 否定确认,相对于Nack少一个参数;
    • 不处理该消息了直接拒绝,可以将其丢弃。

Multiple 批量应答

可以减少网络拥堵。

1
2
channel.basicAck(deliveryTag,true);
//参数2,true表示批量应答

Multiple:

  • true代表批量应答Channel上未应答的消息
    • 如Channel上有消息5,6,7,8,当前tag是8,那么此时5-8这些未应答的消息都会被确认应答。
  • false
    • 同上面相比,不会确认5-7的数据。
    • 一般建议,不要批量应答
1
2
3
4
5
6
7
channel.basicConsume("q1",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费成功:"+new String(body));
this.getChannel().basicAck(envelope.getDeliveryTag(),false);
}
});

重新入队

  • 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送Ack确认,RabbitMQ将了解消息未完全处理,并将其重新排队。

  • 若此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。

  • 这样,即使某个消费者偶然异常,也能保证不会丢失任何消息。

 评论