欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > 黑马商城(七)MQ高级

黑马商城(七)MQ高级

2025/5/5 14:35:40 来源:https://blog.csdn.net/DDDiccc/article/details/147411179  浏览:    关键词:黑马商城(七)MQ高级

一、消息发送者可靠性

发送者重连:

发送者确认:

SpringAMQP实现发送者确认: 

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.debug("监听到了消息的return callback");log.debug("exchange: {}",returnedMessage.getExchange());log.debug("routingKey: {}",returnedMessage.getRoutingKey());log.debug("message: {}",returnedMessage.getMessage());log.debug("replyCode: {}",returnedMessage.getReplyCode());log.debug("replyText: {}",returnedMessage.getReplyText());}});}
}
@Testpublic void testConfirmCallback(){//创建correlationDataCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {log.error("spring amqp 处理结果异常",ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {//判断是否成功if (result.isAck()){//成功log.debug("收到ConfirmCallback ack, 消息发送成功!");}else {log.debug("收到ConfirmCallback nack, 消息发送失败! reason:{}",result.getReason());}}});//1.交换机名String exName="hmall.fanout";//2.消息String message="hello.fuckeveryone!";//3.发送消息rabbitTemplate.convertAndSend(exName,null,message,cd);try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}

二、MQ的可靠性

数据持久化:

 MQ重启之后持久化消息还会存在

持久化效率高于非持久化

Lazy Queue:

 总结:

三、消费者的可靠性

消费者确认机制:

nack--再次投递--直到完全失败(宕机)---重新保留消息

失败重试机制 :

logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.50.129 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码listener:simple:prefetch: 1acknowledge-mode: autoretry:enabled: true

@Configuration
@RequiredArgsConstructor
public class ErrorMessageConfiguration {@Beanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorQueueBinding(Queue errorQueue,DirectExchange errorExchange){return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}//失败处理策略@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
}

业务幂等性:

保持幂等性方案--唯一消息ID: 

@Configuration
public class MessageConverterConfiguration {@Beanpublic MessageConverter messageConverter(){Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}
}
    @RabbitListener(queues = "simple.queue")public void listenSimpleQueue(/*String*/Message message){log.info("监听到simple.queue的消息ID:{}",message.getMessageProperties().getMessageId()  );log.info("监听到simple.queue的消息:{}",new String(message.getBody())  );// throw new RuntimeException("我是故意的");}

保持幂等性方案--业务判断: 

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "trade.pay.success.queue",durable = "true"),exchange = @Exchange(name = "pay.direct",type = ExchangeTypes.DIRECT),key = {"pay.success"}))public void listenPaySuccess(Long orderId){//1. 查询订单Order order = orderService.getById(orderId);//2. 判断订单是否为已支付if(order == null || order.getStatus() != 1){//不做处理return ;}//3. 标记订单为已支付orderService.markOrderPaySuccess(orderId);}

四、可靠性总结

五、延迟消息 (兜底方案)

死信交换机:

@Configuration
public class NormalConfiguration {@Beanpublic DirectExchange normalExchange(){return new DirectExchange("normal.direct");/* return ExchangeBuilder.fanoutExchange("hamll.fanout").build();*/}@Beanpublic Queue normalQueue(){return QueueBuilder.durable("normal.queue")  //指定队列名.deadLetterExchange("dlx.direct") //设置死信交换机名.build();/*return new Queue("fanout.queue1");*/}@Beanpublic Binding normalExchangeBinding(Queue normalQueue, DirectExchange normalExchange){return BindingBuilder.bind(normalQueue).to( normalExchange).with("hi");}}
    @Testpublic void testSendMessage2(){rabbitTemplate.convertAndSend("normal.direct", "hi", "hello", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 设置消息过期时间message.getMessageProperties().setExpiration("10000");return message;}});}

延迟消息插件:

插件使用  : 黑马微服务 飞书

声明交换机:

 发消息:

案例---实现取消超时订单: 

    @Override@GlobalTransactionalpublic Long createOrder(OrderFormDTO orderFormDTO) {// 1.订单数据Order order = new Order();// 1.1.查询商品List<OrderDetailDTO> detailDTOS = orderFormDTO.getDetails();// 1.2.获取商品id和数量的MapMap<Long, Integer> itemNumMap = detailDTOS.stream().collect(Collectors.toMap(OrderDetailDTO::getItemId, OrderDetailDTO::getNum));Set<Long> itemIds = itemNumMap.keySet();// 1.3.查询商品/*List<ItemDTO> items = itemService.queryItemByIds(itemIds);*/List<ItemDTO> items = itemClient.queryItemByIds(itemIds);if (items == null || items.size() < itemIds.size()) {throw new BadRequestException("商品不存在");}// 1.4.基于商品价格、购买数量计算商品总价:totalFeeint total = 0;for (ItemDTO item : items) {total += item.getPrice() * itemNumMap.get(item.getId());}order.setTotalFee(total);// 1.5.其它属性order.setPaymentType(orderFormDTO.getPaymentType());order.setUserId(UserContext.getUser());order.setStatus(1);// 1.6.将Order写入数据库order表中save(order);// 2.保存订单详情List<OrderDetail> details = buildDetails(order.getId(), items, itemNumMap);detailService.saveBatch(details);// 3.清理购物车商品/*       cartService.removeByItemIds(itemIds);*///cartClient.deleteCartItemByIds(itemIds);// 4.扣减库存try {itemClient.deductStock(detailDTOS);} catch (Exception e) {throw new RuntimeException("库存不足!");}//5. 发送延迟i消息,检测订单支付状态rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE_NAME,MQConstants.DELAY_ORDER_KEY,order.getId(), message -> {message.getMessageProperties().setDelay(10000);return message;});//异步通知rabbitTemplate.convertAndSend("trade.topic", "order.create", itemIds, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {Long userId=UserContext.getUser();log.info("清理购物车商品消息发送成功,用户id: {}",userId);message.getMessageProperties().setHeader("userId",userId);return message;}});return order.getId();}

@RequiredArgsConstructor
@Component
public class OrderDelayMessageListener {private final  IOrderService orderService;private final PayClient payClient;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MQConstants.DELAY_ORDER_QUEUE_NAME),exchange = @Exchange(name = MQConstants.DELAY_EXCHANGE_NAME, delayed = "true"),key = MQConstants.DELAY_ORDER_KEY))public void listenOrderDelayMessage(Long orderId){//1. 查询订单状态Order order = orderService.getById(orderId);//2. 检测订单状态 判断是否支付if(order == null || order.getStatus() != 1){//订单不存在或者未支付return ;}//3. 未支付需要查询支付流水状态再次确定PayOrderDTO payorder = payClient.queryPayOrderByBizOrderNo(orderId);//4. 判断是否已支付if(payorder != null && payorder.getStatus() ==3 ){//4.1 支付则标记订单状态为已支付orderService.markOrderPaySuccess(orderId);}else {//4.2 未支付则取消订单 恢复库存orderService.cancelOrder(orderId);}}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词