本笔记内容为尚硅谷谷粒商城订单服务锁库存事务最终一致性部分

目录

一、RabbitMQ延时队列

二、具体实现

1、 创建上述队列和路由组件

2、解锁库存

解锁库存的两种情况

1、当订单业务提交后回滚

2、订单取消解锁库存

三、关闭订单

四、消息丢失、挤压、重复等解决方案

一、RabbitMQ延时队列

场景:比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品

常用解决方案:spring的schedule定时任务轮询数据库

缺点:消耗系统内存,增加数据库压力,存在较大的时间误差

解决:RabbitMQ的消息TTL的死信Exchange结合

消息的TTL就是消息的存活时间

RabbitMQ可以对队列和消息分别设置TTL:推荐给队列设置过期时间

对队列设置就是队列没有消费者连着的保留时间,也可以对每个单独的消息做单独的设置,超过了这个时间,我们认为这个消息就死了,称之为死信;如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列,这个消息死亡的时间有可能不一样(不同队列设置的)

二、具体实现

 

主要流程: 1、库存锁定,将信息压入延时队列中,延时队列是50min,也可以是40min,自定义即可,如果时间到了,就成了死信队列,通过路由key为stock.releace就发送给交换机,再由交换机发送给解锁库存的队列stock.release.stock.queue,然后由库存服务监听这个队列,进行库存解锁 2、订单创建成功后携带路由key=order.create.order发送给交换机,再由交换机发送给延时队列,延时时间为30min,如果时间到了,延时队列就变成了死信队列,进而带路由key=order.release.order发送给交换机,由交换机发送给关闭订单的队列order.release.order.queue,订单服务监听此队列进行关单;这样就完成了30min没有支付就关单的功能

1、 创建上述队列和路由组件

MyRabbitMQConfig.java

package com.atguigu.gulimall.order.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.Exchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

@Configuration

public class MyRabbitMQConfig {

/* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */

/**

* 死信队列

*

* @return

*/

@Bean

public Queue orderDelayQueue() {

/*

Queue(String name, 队列名字

boolean durable, 是否持久化

boolean exclusive, 是否排他

boolean autoDelete, 是否自动删除

Map arguments) 属性

*/

HashMap arguments = new HashMap<>();

arguments.put("x-dead-letter-exchange", "order-event-exchange");

arguments.put("x-dead-letter-routing-key", "order.release.order");

arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟

Queue queue = new Queue("order.delay.queue", true, false, false, arguments);

return queue;

}

/**

* 普通队列处理订单

*

* @return

*/

@Bean

public Queue orderReleaseQueue() {

Queue queue = new Queue("order.release.order.queue", true, false, false);

return queue;

}

/**

* TopicExchange

*

* @return

*/

@Bean

public Exchange orderEventExchange() {

/*

* String name,

* boolean durable,

* boolean autoDelete,

* Map arguments

* */

return new TopicExchange("order-event-exchange", true, false);

}

@Bean

public Binding orderCreateBinding() {

/*

* String destination, 目的地(队列名或者交换机名字)

* DestinationType destinationType, 目的地类型(Queue、Exhcange)

* String exchange,

* String routingKey,

* Map arguments

* */

return new Binding("order.delay.queue",

Binding.DestinationType.QUEUE,

"order-event-exchange",

"order.create.order",

null);

}

@Bean

public Binding orderReleaseBinding() {

return new Binding("order.release.order.queue",

Binding.DestinationType.QUEUE,

"order-event-exchange",

"order.release.order",

null);

}

/**

* 订单释放直接和库存释放进行绑定

* @return

*/

@Bean

public Binding orderReleaseOtherBinding() {

return new Binding("stock.release.stock.queue",

Binding.DestinationType.QUEUE,

"order-event-exchange",

"order.release.other.#",

null);

}

}

2、解锁库存

引入依赖

org.springframework.boot

spring-boot-starter-amqp

主启动类添加注解

配置

spring.rabbitmq.host=192.168.88.130

spring.rabbitmq.port=5672

spring.rabbitmq.virtual-host=/

新建mq组件

MyRabbitConfig.java

package com.atguigu.gulimail.ware.config;

import com.atguigu.gulimail.ware.entity.WareInfoEntity;

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.*;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

import org.springframework.amqp.support.converter.MessageConverter;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class MyRabbitConfig {

/**

* 使用JSON序列化机制,进行消息转换

*

* @return

*/

@Bean

public MessageConverter messageConverter() {

return new Jackson2JsonMessageConverter();

}

@Bean

public Exchange stockEventExchange() {

return new TopicExchange("stock-event-exchange", true, false);

}

@Bean

public Queue stockReleaseStockQueue() {

return new Queue("stock.release.stock.queue", true, false, false);

}

@Bean

public Queue stockDelayQueue() {

Map arguments = new HashMap<>();

arguments.put("x-dead-letter-exchange", "stock-event-exchange");

arguments.put("x-dead-letter-routing-key", "stock.release");

arguments.put("x-message-ttl", 120000);

return new Queue("stock.delay.queue", true, false, false, arguments);

}

@Bean

public Binding stockLockedBinding() {

return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null);

}

@Bean

public Binding stockReleaseBinding() {

return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null);

}

//第一次监听消息时,idea会连接rabbitMQ,此时才会创建rdbbitMQ中没有的队列、交换机和绑定关系

//如果需要修改rabbitMQ中已存在的队列交换机,需要先删除,然后再次创建

// @RabbitListener(queues = "stock.release.stock.queue")

// public void listener(WareInfoEntity entity, Channel channel, Message msg) throws IOException {

// System.out.println("收到过期的订单信息:准备关闭订单" + entity.getId());

// channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);

// }

}

解锁库存的两种情况

业务失败,订单提交了,但是后续业务处理失败,解锁库存订单取消了,解锁库存

写个监听器监听锁定库存时存入锁定库存信息的队列

StockReleaseListener.java

package com.atguigu.gulimall.ware.listener;

import com.rabbitmq.client.Channel;

import com.atguigu.common.to.OrderTo;

import com.atguigu.common.to.mq.StockLockedTo;

import com.atguigu.gulimall.ware.service.WareSkuService;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import java.io.IOException;

@Slf4j

@RabbitListener(queues = "stock.release.stock.queue")

@Service

public class StockReleaseListener {

@Autowired

private WareSkuService wareSkuService;

/**

* 1、库存自动解锁

* 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁

*

* 2、订单失败

* 库存锁定失败

*

* 只要解锁库存的消息失败,一定要告诉服务解锁失败

*/

@RabbitHandler

public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {

log.info("******收到解锁库存的信息******");

try {

//当前消息是否被第二次及以后(重新)派发过来了

// Boolean redelivered = message.getMessageProperties().getRedelivered();

//解锁库存

wareSkuService.unlockStock(to);

// 手动删除消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

} catch (Exception e) {

// 解锁失败 将消息重新放回队列,让别人消费

channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

}

}

@RabbitHandler

public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {

log.info("******收到订单关闭,准备解锁库存的信息******");

try {

wareSkuService.unlockStock(orderTo);

// 手动删除消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

} catch (Exception e) {

// 解锁失败 将消息重新放回队列,让别人消费

channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

}

}

}

1、当订单业务提交后回滚

WareSkuServiceImpl.java

@Override

public void unlockStock(StockLockedTo to) {

StockDetailTo detail = to.getDetail();

Long detailId = detail.getId();

//解锁库存

//1.查询关于这个订单的锁定库存信息

WareOrderTaskDetailEntity orderTaskDetailEntity = orderDetailService.getById(detailId);

if (orderTaskDetailEntity != null) {

//有,库存锁定成功,根据订单情况解锁

Long id = to.getId();//库存工作单Id

WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);

String orderSn = taskEntity.getOrderSn();

R r = orderFeignService.getOrderStatus(orderSn);

if (r.getCode() == 0) {

OrderVo data = r.getData(new TypeReference() {

});

if (data == null || data.getStatus() == 4) {

//没有这个订单 或者 有订单但订单状态是已取消,解锁库存

//只有状态是1,才能解锁

if (orderTaskDetailEntity.getLockStatus() == 1) {

unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);

}

}

} else {

//其它状态(包含订单成功)不解锁

throw new RuntimeException("远程服务失败");

}

} else {

//没有,库存锁定失败,库存回滚,这种情况无需解锁

}

}

需要远程查询订单状态再调用解锁方法

远程接口

package com.atguigu.gulimall.ware.feign;

import com.atguigu.common.utils.R;

import org.springframework.cloud.openfeign.FeignClient;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

@FeignClient("gulimall-order")

public interface OrderFeignService {

@GetMapping(value = "/order/order/status/{orderSn}")

R getOrderStatus(@PathVariable("orderSn") String orderSn);

}

订单服务下

OrderController.java

/**

* 根据订单编号查询订单状态

* @param orderSn

* @return

*/

@GetMapping(value = "/status/{orderSn}")

public R getOrderStatus(@PathVariable("orderSn") String orderSn) {

OrderEntity orderEntity = orderService.getOrderByOrderSn(orderSn);

return R.ok().setData(orderEntity);

}

OrderServiceImpl.java

/**

* 按照订单号获取订单信息

* @param orderSn

* @return

*/

@Override

public OrderEntity getOrderByOrderSn(String orderSn) {

OrderEntity orderEntity = this.baseMapper.selectOne(new QueryWrapper().eq("order_sn", orderSn));

return orderEntity;

}

2、订单取消解锁库存

WareSkuServiceImpl.java

/*

*防止因为订单服务故障,导致订单状态未改变,从而无法解锁库存

*/

@Transactional

@Override

public void unlockStock(OrderTo orderTo) {

String orderSn = orderTo.getOrderSn();

//查询最新库存状态

WareOrderTaskEntity task = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);

Long id = task.getId();

List list = orderDetailService.list(new QueryWrapper().eq("task_id", id)

.eq("lock_status", 1));

for (WareOrderTaskDetailEntity entity : list) {

unLockStock(entity.getSkuId(), entity.getWareId(), entity.getSkuNum(), entity.getId());

}

}

public void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {

//库存解锁

wareSkuDao.unlockStock(skuId, wareId, num);

//更新库存工作单状态

WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();

entity.setId(taskDetailId);

entity.setLockStatus(2);

orderDetailService.updateById(entity);

}

wareOrderTaskService.getOrderTaskByOrderSn()

@Override

public WareOrderTaskEntity getOrderTaskByOrderSn(String orderSn) {

WareOrderTaskEntity task = this.getOne(new QueryWrapper().eq("order_sn", orderSn));

return task;

}

wareSkuDao.unlockStock(skuId, wareId, num)

UPDATE `wms_ware_sku` SET stock_locked = IFNULL(stock_locked,0) - #{num}

WHERE sku_id = #{skuId} AND ware_id = #{wareId}

三、关闭订单

回到提交订单的业务,当提交订单时库存锁定成功,给死信队列发消息,开始执行关闭订单的业务,删除购物车数据

OrderServiceImpl.java下的 submitOrder 方法

if (r.getCode() == 0) {

//锁定成功

responseVo.setOrder(order.getOrder());

// int i = 10/0;

//TODO 订单创建成功,发送消息给MQ

rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder());

//删除购物车里的数据

redisTemplate.delete(CART_PREFIX+memberResponseVo.getId());

return responseVo;

} else {

//锁定失败

String msg = (String) r.get("msg");

throw new NoStockException(msg);

// responseVo.setCode(3);

// return responseVo;

}

监听普通队列的消息。当普通队列中监听到消息,说明死信队列订单过期,返还给普通队列来处理关闭订单的消息。

OrderCloseListener.java

@Component

@RabbitListener(queues = "order.release.order.queue")

public class OrderCloseListener {

@Autowired

OrderService orderService;

@RabbitHandler

public void listener(OrderEntity entity, Channel channel, Message msg) throws IOException {

try {

System.out.println("收到过期的订单信息:准备关闭订单" + entity.getOrderSn());

orderService.closeOrder(entity);

channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);

} catch (Exception e) {

System.out.println("订单关闭异常,库存解锁异常" + e.getMessage());

channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);

}

}

}

关闭订单、解锁库存 订单关闭时发送消息给库存服务的普通队列,让库存服务解锁库存。

OrderServiceImpl.java

/**

* 关闭订单

*

* @param entity

*/

@Override

public void closeOrder(OrderEntity entity) {

//查询订单最新状态

OrderEntity orderEntity = this.getById(entity.getId());

if (orderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()) {

//关闭订单

OrderEntity update = new OrderEntity();

update.setId(entity.getId());

update.setStatus(OrderStatusEnum.CANCLED.getCode());

this.updateById(update);

OrderTo orderTo = new OrderTo();

BeanUtils.copyProperties(orderEntity, orderTo);

try {

//每一条消息进行日志记录(数据库保存每一条消息的详细信息)

//定期扫描数据库将失败的消息再发送一遍

rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);

} catch (Exception e) {

//将没法送成功的消息进行重试发送

}

}

}

四、消息丢失、挤压、重复等解决方案

保证消息一定会发送出去,每一个消息都可以做好日志记录 

CREATE TABLE `mq_message` (

`message_id` char(32) NOT NULL,

`content` text,

`to_exchane` varchar(255) DEFAULT NULL,

`routing_key` varchar(255) DEFAULT NULL,

`class_type` varchar(255) DEFAULT NULL,

`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',

`create_time` datetime DEFAULT NULL,

`update_time` datetime DEFAULT NULL,

PRIMARY KEY (`message_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

结束!

文章来源

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。