RabbitMQ 4.9 SpringBoot与RabbitMQ
uwupu 啦啦啦啦啦

SpringBoot整合RabbitMQ

依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Fanout生产者示例

  1. 使用RabbitMQConfiguration初始化Exchange、Queue以及进行队列交换机绑定;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34

    @Configuration
    public class RabbitMQConfiguration {

    //声明注册fanout模式的交换机
    @Bean
    public FanoutExchange fanoutExchange(){
    return new FanoutExchange("fanoutExchange",true,false);
    }
    //声明队列:save, sms, email, notice
    @Bean
    public Queue saveQueue(){
    return new Queue("save.fanout.queue",true);
    }
    @Bean
    public Queue smsQueue(){
    return new Queue("sms.fanout.queue",true);
    }
    @Bean
    public Queue emailQueue(){
    return new Queue("email.fanout.queue",true);
    }
    @Bean
    public Queue noticeQueue(){
    return new Queue("notice.fanout.queue",true);
    }


    //完成绑定关系(队列和交换机完成绑定关系)
    @Bean public Binding saveBinding(){return BindingBuilder.bind(saveQueue()).to(fanoutExchange());}
    @Bean public Binding smsBinding(){return BindingBuilder.bind(smsQueue()).to(fanoutExchange());}
    @Bean public Binding emailBinding(){return BindingBuilder.bind(emailQueue()).to(fanoutExchange());}
    @Bean public Binding noticeBinding(){return BindingBuilder.bind(noticeQueue()).to(fanoutExchange());}
    }
  2. 使用rabbitTemplate进行消息发送

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Service
    public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
    * 模拟用户下单
    * @param uid x
    * @param pid x
    * @param num x
    */
    public void makeOrder(String uid,String pid,int num){
    //生成订单
    String orderId = UUID.randomUUID().toString();
    System.out.println("生成订单:"+orderId);

    //通过MQ发送消息
    String exchangeName = "fanoutExchange";
    String routingKey = "";

    // 交换机 , 路由Key或队列名称, 消息内容
    rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    @Autowired
    OrderService orderService;

    @Test
    void contextLoads() {
    orderService.makeOrder("1","1",123);
    }

消费者示例

创建一个Consumer类,使用@Service注解和@RabbitListener注解

其中:@RabbitListener(queues = {"email.fanout.queue","队列名"}),表示要监听指定队列的消息信息

然后在类的方法中加注解@RabbitHandler表示该方法负责处理收到的消息。

1
2
3
4
5
6
7
8
@RabbitListener(queues = {"email.fanout.queue"})
@Service
public class EmailConsumer {
@RabbitHandler
public void recevieMessage(String message){
System.out.println("email 收到订单信息:"+message);
}
}

DirectExchange生产者示例

与Fanout不同的是,直接将Bean Exchange的类型改为DirectExchange,然后键Binding Bean的添加方法with绑定RoutingKey即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
public class DirectRabbitMQConfiguration {

//声明注册fanout模式的注册机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange",true,false);
}
//声明队列:save, sms, email, notice
@Bean
public Queue saveQueue_testInDirect(){
return new Queue("save.direct.queue",true);
}
@Bean
public Queue smsQueue_testInDirect(){
return new Queue("sms.direct.queue",true);
}
@Bean
public Queue emailQueue_testInDirect(){
return new Queue("email.direct.queue",true);
}
@Bean
public Queue noticeQueue_testInDirect(){
return new Queue("notice.direct.queue",true);
}


//完成绑定关系(队列和交换机完成绑定关系)
@Bean public Binding saveBindingInDirect(){return BindingBuilder.bind(saveQueue_testInDirect()).to(directExchange()).with("save");}
@Bean public Binding smsBindingInDirect(){return BindingBuilder.bind(smsQueue_testInDirect()).to(directExchange()).with("sms");}
@Bean public Binding emailBindingInDirect(){return BindingBuilder.bind(emailQueue_testInDirect()).to(directExchange()).with("email");}
@Bean public Binding noticeBindingInDirect(){return BindingBuilder.bind(noticeQueue_testInDirect()).to(directExchange()).with("notice");}
}

TopicExchange注解方式消费者示例

这个没有可以学到的内容,建议先学@QueueBinding注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topicExchange",type = ExchangeTypes.TOPIC),
key = "#.email.*"
)
)
@Service
public class EmailConsumerTopic {
@RabbitHandler
public void recevieMessage(String message){
System.out.println("email topic 收到订单信息:"+message);
}
}

一些SpringBoot的配置

配置连接名,集群,地址连接方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* 配置连接名 */
@Bean
public SimplePropertyValueConnectionNameStrategy cns(){
return new SimplePropertyValueConnectionNameStrategy("自定义连接Rabbit的名字");
}

@Bean
ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
//配置连接名
cachingConnectionFactory.setConnectionNameStrategy(cns);;
//配置集群,多地址
cachingConnectionFactory.setAddresses("host1:5672,host2:5672,host3:5672");
//设置地址连接方式,默认情况下会随机选择一个地址,若不可用,则从前往后依次连接。
cachingConnectionFactory.setAddressShuffleMode(AbstractConnectionFactory.AddressShuffleMode.RANDOM);
return cachingConnectionFactory;
}
 评论