SpringBoot整合RabbitMQ
依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
Fanout生产者示例
使用RabbitMQConfiguration初始化Exchange、Queue以及进行队列交换机绑定;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Configuration public class RabbitMQConfiguration {
@Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange",true,false); } @Bean public Queue saveQueue(){ return new Queue("save.fanout.queue",true); } @Bean public Queue smsQueue(){ return new Queue("sms.fanout.queue",true); } @Bean public Queue emailQueue(){ return new Queue("email.fanout.queue",true); } @Bean public Queue noticeQueue(){ return new Queue("notice.fanout.queue",true); }
@Bean public Binding saveBinding(){return BindingBuilder.bind(saveQueue()).to(fanoutExchange());} @Bean public Binding smsBinding(){return BindingBuilder.bind(smsQueue()).to(fanoutExchange());} @Bean public Binding emailBinding(){return BindingBuilder.bind(emailQueue()).to(fanoutExchange());} @Bean public Binding noticeBinding(){return BindingBuilder.bind(noticeQueue()).to(fanoutExchange());} }
|
使用rabbitTemplate进行消息发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate;
public void makeOrder(String uid,String pid,int num){ String orderId = UUID.randomUUID().toString(); System.out.println("生成订单:"+orderId);
String exchangeName = "fanoutExchange"; String routingKey = "";
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId); } }
|
1 2 3 4 5 6 7
| @Autowired OrderService orderService;
@Test void contextLoads() { orderService.makeOrder("1","1",123); }
|
消费者示例
创建一个Consumer类,使用@Service注解和@RabbitListener注解
其中:@RabbitListener(queues = {"email.fanout.queue","队列名"})
,表示要监听指定队列的消息信息
然后在类的方法中加注解@RabbitHandler
表示该方法负责处理收到的消息。
1 2 3 4 5 6 7 8
| @RabbitListener(queues = {"email.fanout.queue"}) @Service public class EmailConsumer { @RabbitHandler public void recevieMessage(String message){ System.out.println("email 收到订单信息:"+message); } }
|
DirectExchange生产者示例
与Fanout不同的是,直接将Bean Exchange的类型改为DirectExchange,然后键Binding Bean的添加方法with绑定RoutingKey即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Configuration public class DirectRabbitMQConfiguration {
@Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange",true,false); } @Bean public Queue saveQueue_testInDirect(){ return new Queue("save.direct.queue",true); } @Bean public Queue smsQueue_testInDirect(){ return new Queue("sms.direct.queue",true); } @Bean public Queue emailQueue_testInDirect(){ return new Queue("email.direct.queue",true); } @Bean public Queue noticeQueue_testInDirect(){ return new Queue("notice.direct.queue",true); }
@Bean public Binding saveBindingInDirect(){return BindingBuilder.bind(saveQueue_testInDirect()).to(directExchange()).with("save");} @Bean public Binding smsBindingInDirect(){return BindingBuilder.bind(smsQueue_testInDirect()).to(directExchange()).with("sms");} @Bean public Binding emailBindingInDirect(){return BindingBuilder.bind(emailQueue_testInDirect()).to(directExchange()).with("email");} @Bean public Binding noticeBindingInDirect(){return BindingBuilder.bind(noticeQueue_testInDirect()).to(directExchange()).with("notice");} }
|
TopicExchange注解方式消费者示例
这个没有可以学到的内容,建议先学@QueueBinding注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"), exchange = @Exchange(value = "topicExchange",type = ExchangeTypes.TOPIC), key = "#.email.*" ) ) @Service public class EmailConsumerTopic { @RabbitHandler public void recevieMessage(String message){ System.out.println("email topic 收到订单信息:"+message); } }
|
一些SpringBoot的配置
配置连接名,集群,地址连接方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Bean public SimplePropertyValueConnectionNameStrategy cns(){ return new SimplePropertyValueConnectionNameStrategy("自定义连接Rabbit的名字"); }
@Bean ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns){ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setConnectionNameStrategy(cns);; cachingConnectionFactory.setAddresses("host1:5672,host2:5672,host3:5672"); cachingConnectionFactory.setAddressShuffleMode(AbstractConnectionFactory.AddressShuffleMode.RANDOM); return cachingConnectionFactory; }
|