首页
视频
留言
壁纸
直播
下载
友链
统计
推荐
vue
在线工具
Search
1
ElasticSearch ES 安装 Kibana安装 设置密码
421 阅读
2
记一个报错GC overhead limit exceeded解决方法
344 阅读
3
Teamcity + Rancher + 阿里云Code 实现Devops 自动化部署
230 阅读
4
JAVA秒杀系统的简单实现(Redis+RabbitMQ)
209 阅读
5
分布式锁Redisson,完美解决高并发问题
206 阅读
JAVA开发
前端相关
Linux相关
电商开发
经验分享
电子书籍
个人随笔
行业资讯
其他
登录
/
注册
Search
标签搜索
AOP
支付
小说
docker
SpringBoot
XML
秒杀
K8S
RabbitMQ
工具类
Shiro
多线程
分布式锁
Redisson
接口防刷
Jenkins
Lewis
累计撰写
146
篇文章
累计收到
14
条评论
首页
栏目
JAVA开发
前端相关
Linux相关
电商开发
经验分享
电子书籍
个人随笔
行业资讯
其他
页面
视频
留言
壁纸
直播
下载
友链
统计
推荐
vue
在线工具
搜索到
1
篇与
的结果
2021-03-04
SpringBoot整合RabbitMQ实现延迟消息
整合RabbitMQ实现延迟消息的过程,以发送延迟消息取消超时订单为例。1.Docker下安装RabbitMQsudo docker pull rabbitmq:management docker run -dit --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management docker update rabbitmq--restart=always2.RabbitMQ下增加用户及角色 3.rabbitmq的消息模型标志中文名英文名描述P生产者Producer消息的发送者,可以将消息发送到交换机C消费者Consumer消息的接收者,从队列中获取消息进行消费X交换机Exchange接收生产者发送的消息,并根据路由键发送给指定队列Q队列Queue存储从交换机发来的消息type交换机类型typedirect表示直接根据路由键(orange/black)发送消息4.SpringBoot项目集成RabbitMQ修改pom.xm文件<!--消息队列相关依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>修改application.ymlrabbitmq: port: 5672 host: 172.16.107.192 virtual-host: shop username: shop password: shop4.1具体业务代码添加消息队列的枚举配置类QueueEnumpackage com.bdego.modules.shop.enums; import lombok.Getter; /** * 商城订单模块消息队列枚举配置 */ @Getter public enum OrderQueueEnum { /** * 消息通知队列 */ QUEUE_ORDER_CANCEL("shop.order.direct", "shop.order.cancel", "shop.order.cancel"), /** * 消息通知ttl队列 */ QUEUE_TTL_ORDER_CANCEL("shop.order.direct.ttl", "shop.order.cancel.ttl", "shop.order.cancel.ttl"); /** * 交换机名称 */ private String exchange; /** * 队列名称 */ private String name; /** * 路由键 */ private String routeKey; OrderQueueEnum(String exchange, String name, String routeKey) { this.exchange = exchange; this.name = name; this.routeKey = routeKey; } }添加RabbitMQ的配置,用于配置交换机、队列及队列与交换机的绑定关系。package com.bdego.config; import com.bdego.modules.shop.enums.OrderQueueEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 消息队列配置 */ @Configuration public class RabbitMqConfig { /** * 订单消息实际消费队列所绑定的交换机 */ @Bean DirectExchange orderDirect() { return (DirectExchange) ExchangeBuilder .directExchange(OrderQueueEnum.QUEUE_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 订单延迟队列所绑定的交换机 */ @Bean DirectExchange orderTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(OrderQueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 订单实际消费队列 */ @Bean public Queue orderQueue() { return new Queue(OrderQueueEnum.QUEUE_ORDER_CANCEL.getName()); } /** * 订单延迟队列(死信队列) */ @Bean public Queue orderTtlQueue() { return QueueBuilder .durable(OrderQueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()) .withArgument("x-dead-letter-exchange", OrderQueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机 .withArgument("x-dead-letter-routing-key", OrderQueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键 .build(); } /** * 将订单队列绑定到交换机 */ @Bean Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){ return BindingBuilder .bind(orderQueue) .to(orderDirect) .with(OrderQueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()); } /** * 将订单延迟队列绑定到交换机 */ @Bean Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){ return BindingBuilder .bind(orderTtlQueue) .to(orderTtlDirect) .with(OrderQueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey()); } }交换机及队列说明shop.order.direct(取消订单消息队列所绑定的交换机):绑定的队列为shop.order.cancel,一旦有消息以shop.order.cancel为路由键发过来,会发送到此队列。shop.order.direct.ttl(订单延迟消息队列所绑定的交换机):绑定的队列为shop.order.cancel.ttl,一旦有消息以shop.order.cancel.ttl为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到shop.order.cancel(取消订单消息消费队列)。添加延迟消息的发送者CancelOrderSenderpackage com.bdego.modules.shop.mq.component; import com.bdego.modules.shop.mq.enums.OrderQueueEnum; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 取消订单消息的发出者 */ @Slf4j @Component public class CancelOrderSender { @Resource private AmqpTemplate amqpTemplate; public void sendMessage(Long orderId,final long delayTimes){ //给延迟队列发送消息 amqpTemplate.convertAndSend(OrderQueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), OrderQueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; } }); log.info("send delay message orderId:{}",orderId); } }添加取消订单消息的接收者CancelOrderReceiverpackage com.bdego.modules.shop.mq.component; import com.bdego.modules.shop.service.OrderService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 取消订单消息的处理者 */ @Slf4j @Component @RabbitListener(queues = "shop.order.cancel") public class CancelOrderReceiver { @Autowired private OrderService orderService; @RabbitHandler public void handle(Long orderId){ log.info("receive delay message orderId:{}",orderId); orderService.cancelOrder(orderId); } }
2021年03月04日
51 阅读
0 评论
0 点赞