秒杀模块编写思维总结
分为两种模式一个是限时购买,一个是限裤购买。
我们这里使用的是指定时间段下面的限库购买
单独使用一个库来存储数据,下面有两张表,一张表是具体的商品,一张表是订单。用户下的订单数据都要放到redis 中,等到最后用户抢到商品并成功支付之后,在录入到数据库中,这样可以减轻数据库的压力,不会导致数据库宏机的现状。
秒杀商品的过程:在这个时间段内,用户直接下单,抢到并且支付成功之后,才可以更新到数据库中,如果没有支付成功的话,那么数据就要回滚,保证库存量正好。
普通商品的话,先要加入购物车然后再去下单,下完订单支付成功之后的话,那就要删除购物车的数据(这里不删也是可以的,做个缓存以后可以快速找到这个商品。)最后将订单号条件到数据库中,如果支付超时的话那数据也是要回滚的
1.第一个功能商品存入redis 中
package com.guoshuxiang.task; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.guoshuxiang.entity.TbSeckillGoodsModel; import com.guoshuxiang.service.TbSeckillGoodsService; import com.guoshuxiang.util.DateUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.text.ParseException; import java.util.Date; import java.util.List; import java.util.Set; /**主要获去指定时间的数据 ,也不能平凡刷新数据,查的数据不可以二次查找,不然东西会一直卖的 */ @Component public class SeckillJob { @Autowiredprivate TbSeckillGoodsService seckillGoodsService; @Autowiredprivate RedisTemplate redisTemplate; @Scheduled(cron = "* 0/5 * * * *") // 每隔五分钟查询一次数据库public void searchDBToRedis() {try {List<Date> dateMenus = DateUtil.getDateMenus(); // 获取时间段for (Date date : dateMenus) {String extTime = DateUtil.format(date, DateUtil.PATTERN_YYYYMMDDHH); // 获取每一次的时间段 String start = DateUtil.format(date, "yyyy-MM-dd HH:mm:ss"); // 开始时间段String end = DateUtil.format(DateUtil.addDateHour(date, 2), "yyyy-MM-dd HH:mm:ss");//结束时间段System.out.println(start + "\t" + end);// 开始时间段和结束时间段打印一下 这里作为查询数据库的条件 QueryWrapper<TbSeckillGoodsModel> qw = new QueryWrapper<>();qw.lambda().eq(TbSeckillGoodsModel::getStatus, "1") // 商品状态.gt(TbSeckillGoodsModel::getStockCount, 0) // 剩余库存量.ge(TbSeckillGoodsModel::getStartTime, start) // 开始时间 大于等于.le(TbSeckillGoodsModel::getEndTime, end); // 小于结束时间 小于等于 //过滤已经在redis中存储的商品信息 就是获取全部的键 // 第一次执行的时候没有作用Set keys = redisTemplate.boundHashOps("goods:" + extTime).keys(); if (keys.size() > 0) {// 这里里相反的接口,只要上面查到的数据,就不应再再此出现,不然的话数据一直更新就会变成死循环,东西一直卖不完// 在这里继续拼接条件qw.lambda().notIn(TbSeckillGoodsModel::getId, keys);}// 查询满足条件的商品List<TbSeckillGoodsModel> list = seckillGoodsService.list(qw);for (TbSeckillGoodsModel goodsModel : list) {// 在这里将库存量添加进去 , 这里利用redis的 单线程调用// 大的 hash goods:时间 作为键 将商品的编号作为键 商品的数据作为值redisTemplate.boundHashOps("goods:" + extTime).put(goodsModel.getId().toString(), goodsModel);//根据商品数量,创建一个对应得队列Long[] goodsNumArray = this.ids(goodsModel.getStockCount(), goodsModel.getId());// 所有的库存量都存进去redisTemplate.boundListOps("goodsQueue:" + goodsModel.getId()).leftPushAll(goodsNumArray);}}} catch (Exception e) {e.printStackTrace();}} }
这里使用了 redis 中的 hash 也就是map 类型的数据 存的是 goods:time + 具体的商品id 和 商品的对象的数据 这个类型在redis中的键是 goods:+time 具体的键 建里面是最具体的值,但是值不为一个,所有就有了一个主键
2.第二功能获取指定的时间段下面的商品(数据 从redis 缓存中读取)
@Autowiredpublic TbSeckillGoodsService tbSeckillGoodsModelService; /*** 获取当前下面的时间段* @return*/@GetMapping("/timelist")public List<String> getTime() {// 这里是获取时间段 就放在商品的控制器中return DateUtil.getDateMenus().stream().map(item -> {try {// 获取当前下面的时间段 转化成我们指定的格式return DateUtil.format(item, DateUtil.PATTERN_YYYYMMDDHH);} catch (ParseException e) {e.printStackTrace();}return null;}).collect(Collectors.toList());} /*** 获取某个时间段下面的全部商品 从redis中获取数据* @param time YYYYMMDDHH * @return*/@GetMapping("/list/{time}")public List<TbSeckillGoodsModel> list(@PathVariable("time") String time) {return tbSeckillGoodsModelService.list(time);} /*** 获取某个时间段下面的具体商品 从redis中获取数据* @param time* @param id* @return*/@GetMapping("/one/{time}/{id}")public TbSeckillGoodsModel one(@PathVariable("time") String time, @PathVariable("id") Long id) {return tbSeckillGoodsModelService.one(time, id);}
3.第三个功能开始下单
@Overridepublic /*synchronized*/ boolean create(String username, String time, Long id) {// 从redis中获取哪个时间段下面具体商品的数据TbSeckillGoodsModel goodsModel = (TbSeckillGoodsModel)redisTemplate.boundHashOps("goods:" + time).get(id.toString());if (goodsModel.getStockCount() <= 0) { // 这里小于等于零的时候程序就结束了return false;}/* try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}*/// 创建一个下订单模型TbSeckillOrderModel orderModel = new TbSeckillOrderModel();orderModel.setSeckillId(id); // 秒杀商品的idorderModel.setMoney(goodsModel.getPrice()); // 秒杀商品的价格orderModel.setUserId(username); // 用户名orderModel.setCreateTime(new Date()); // 购买商品的时间orderModel.setStatus("0"); // 用户还没有支付redisTemplate.boundHashOps("seckillOrder").put(username, orderModel);//将订单存入redis中goodsModel.setStockCount(goodsModel.getStockCount() - 1);//将库存量减一// 如果就剩最后一减了,被抢完了,到这里就要更新数据库,和上面从redis中读取数据正好连接起来if (goodsModel.getStockCount() <= 0) { //同步数据库goodsService.updateById(goodsModel);}// 最后还是要将库存量减一的数据存入到redis中redisTemplate.boundHashOps("goods:" + time).put(id.toString(), goodsModel);return true;}
这里第二次使用到redis 中的 hash结构来存储数据 seckillOrder redis中的键 数据很多 这里的键是 username 值是 具体的订单模型
3.1注意这里会出现问题
没有使用多线程来开发的时候会出现的问题 问题一 我们的这段代码是单线程的数据,当用户很多的时候,每个人的执行速度不一致,就到导致商品卖多了, 这里我们可以加数据库的行锁但是,这样访问效率就太低了,但是我们也没有从数据库中读取数据,这里是从redis中读取数据, 我们可以使用java中的锁 synchronized 这样就不会卖超了 问题二 在锁中加了sleep 机制,就会出现超时的问题,由于线程的速度不一致导致的,
3.2解决问题二超时问题
@Async // 开启异步 这个注解是springBoot 自带的使用的时候需要再启动类哪里开启服务public void createOrder() {System.out.println("2.异步运行");//这里和上一个代码一样 这里就放方法名字create(String username, String time, Long id)System.out.println("3.异步执行结束");}
@Data @AllArgsConstructor @NoArgsConstructor // 这里封装一个数据类 正好把数据都存入到队列中去 public class SeckillStatus implements Serializable {private String time;private String username;private Long goodId;//1、排队 2、未支付 3、已支付 4、未抢到 5、超时private Integer status; }
redisTemplate.boundListOps("userQueue").leftPush(seckillStatus); // 左边进入 右边出去 数据就出去了// 这里是第三个 redis 存储数据 List 类型的数据 userQueue:值就是封装的数据 (就是下单的状态)
队列是先进先出(FIFO)的数据结构,将元素插入队列尾部,然后按照先进先出的顺序取出队列头的元素。队列的主要优势是可以实现异步处理,将任务先存储在队列中,然后由后台线程逐个处理任务,从而提高程序的稳定性和可靠性。
@Async // 开启异步 这个注解是springBoot 自带的使用的时候需要再启动类哪里开启服务public void createOrder() {System.out.println("2.异步运行");//这里和上一个代码一样 这里就放方法名字synchronized(本类.class){create(String username, String time, Long id)}System.out.println("3.异步执行结束");}
处理卖超问题,在异步的方法里面加入锁来处理部分的数据,范围更小了,锁的力度更大了
3.3个人理解
当我处理数据的时候这个功能执行时间长,但访问的人很多,这样就很容易出现超时问题,这时我们要使用异步来解决问题,现成结果出来,然后让程序继续执行,但是多线程来访问,线程是相互争抢资源的,这样安全性不高,也不稳定,这里我就要加入队列来处理,也就是异步队列,队列正好是先进先出,将数据排队给程序,一个一个的去执行,这就处理起来并发性就很更安全一点。
4.一个人只能购买一次
这里在秒杀时间段内一个人只能买一次的商品,应为人很多,一个人也就只有一次机会,但是,这里也可以防止使用脚本恶意抢票,相对来说,这是对系统的一个保护措施。
// 防止用户重复提交订单Long userGoodsCount = redisTemplate.boundHashOps("userGoodsCount").increment(username, 1);if (userGoodsCount > 1){System.out.println(username + "当前用户不能提交重复订单!");return false;}
这里是redis 的第四次使用了 userGoodsCount 还是使用 hash 类型 , 里面有一个方法 increment(hk,hv), hv 是自己动增长了。这里是一直存在redis 中的数据,一个人一次,一直保留在这里。
当时我想的时候是单算使用熔断机制,然后检查出请求头中的参数,如果里面的用户名是相同的话那就抛出一个异常数据,不允许用户在一次请求,这里想法还是有问题的,如果用户没有付钱的话订单失效,再一次购买的话,我这里就进不去了,那样的话性能就不好了。(对比还是考虑问题不太周到)
5.开始改进java中的锁 来解决卖超问题
思想是,用一个数组来读取商品的库存量,根据库存量来制定有
/*** 获取每一个商品的库存量* @param len 剩余库存量* @param id 商品的id* @return*/public Long[] ids(Integer len, Long id){Long[] arr = new Long[len];for (Integer i = 0; i < len; i++) {arr[i] = id;}return arr;} // 这里写在定时任务里面 //根据商品数量,创建一个对应得队列 Long[] goodsNumArray = this.ids(goodsModel.getStockCount(), goodsModel.getId()); // 所有的库存量都存进去 goodsQueue+id 下面都是一个对象类型的数据所以都是存放一样的还是有索的 redisTemplate.boundListOps("goodsQueue:" +goodsModel.getId()).leftPushAll(goodsNumArray); // redis 中是单线程的,到这里没有抢到票的就在这里就结束了,这里去掉百分之九十的人 // 这样就解决了卖超的问题if (ObjectUtils.isEmpty(o)) {System.out.println("没有库存了.....");return; } // 如果失败了就失眠一段时间,然后继续执行 try {//当前用户多次尝试获取锁// 没有获取到 等五秒继续尝试while (!redisUtil.lock(id.toString())) {Thread.sleep(500);} } catch (InterruptedException e) {e.printStackTrace(); } // 最后还要删除锁 redisUtil.unlock(id.toString());
这里是第五次使用redis 来存储数据 ,在redis中 使用 list 也就是队列的形式在存储数据,将需要的某个时间段下的每个商品的库存量放入队列中这样就可以缩小范围,保证有票的人可以买到东西,这里还是有可能有卖超的现象。
/*** 添加一个redis 分布式锁* @param goodsId* @return*/public boolean lock(String goodsId) {return (Boolean) redisTemplate.execute(new RedisCallback() {@Overridepublic Object doInRedis(RedisConnection connection) throws DataAccessException {byte[] key = ("lock:" + goodsId).getBytes();byte[] value = goodsId.getBytes();// 这个方法都是 字节数组 这个方法独占线程,不执行完不释放线程Boolean aBoolean = connection.setNX(key, value);if (aBoolean) { //设置成功以后保存一分钟,以后后面的程序出异常connection.expire(key, 60);}return aBoolean;}});} /*** 输出事务的锁* @param goodsId*/public void unlock(String goodsId){redisTemplate.delete("lock:"+goodsId);}
5.1突然想到一个问题
userQueue list 这个是队列 执行异步前存入数据之后异步后删除数据 ,
userSeckillStatus hash 使使用保持每个用户订单的状态,最后在筛选出下单成功的用户添加到数据中的,那是因为订单数据 seckillOrder 的状态都是未支付的那些抢到商品的人。
两个类型不一样作用也是不一样的 合在一起使用的话,就不能解决异步的安全问题,这里还是要分开来写效果更好一点。
6.来获取用户下订单的状态
//存储用户得状态,记录用户对阶段得信息查询 一开始都是排队状态 (异步之前) redisTemplate.boundHashOps("userSeckillStatus").put(username, seckillStatus); //获取状态 一开始都是排队状态 异步里面的代码 SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundHashOps("userSeckillStatus").get(username); // 排队 1 一开始就是这个状态 // 没有库存修改一次状态 4 // 抢到以后没有支付 2 这里主要修改这两个地方,在这是在维护功能
userSeckillStatus 使用 hash 来存储 是 k username v SeckillStatus 订单类型
7.支付来修改订单状态
思想:主要是微信生成订单(这个订单分为普通商品和秒杀商品)支付成功以后微信会调用,将数据发送mq最后 在秒杀模块哪里消费一下,穿过来的用户名找到订单状态 修改状态 更新到redis中 ,在改变数据库中的支付状态,最后删除 seckillOrder 订单 userGoodsCount 防止用户重复订单。
if (type.equals("1")) { // 1 是秒杀的商品 2 或者其他的就是普通的商品System.out.println("发送秒杀得MQ");rabbitTemplate.convertAndSend(MQConfig.seckillOrderExchange, MQConfig.seckillOrderRouting, JSON.toJSONString(map));} else {System.out.println("发送普通订单得MQ");rabbitTemplate.convertAndSend(MQConfig.PAYEXCHANGE, MQConfig.PAYROUTING, orderNo);} // 这里开始消费数据 @RabbitListener(queues = "seckillOrderQueue")public void reviceSeckillPay(String json){System.out.println(json);// {"transaction_id":"4200002135202401058759299558","nonce_str":"bdb6a807140e444f8a9c787eeb07f4c1",// "bank_type":"OTHERS","openid":"oHkLxt7FJCV-ogLa_K-HFVmMsJhY","sign":"C48523C35F33C60D4D4B9BBDBE7B1FA1",// "fee_type":"CNY","mch_id":"11473623","cash_fee":"1","out_trade_no":"1743138065602519042",// "appid":"wxab8acb865bb1637e","total_fee":"1","trade_type":"NATIVE","result_code":"SUCCESS",// "attach":"frank45-1","time_end":"20240105101446","is_subscribe":"N","return_code":"SUCCESS"}Map<String,String> map = JSON.parseObject(json, Map.class);String attach = map.get("attach");String[] split = attach.split("-");String username = split[0]; // String type = split[1];SeckillStatus seckillStatus = (SeckillStatus)redisTemplate.boundHashOps("userSeckillStatus").get(username);//修改redis用户状态为 3tseckillStatus.setStatus(3);redisTemplate.boundHashOps("userSeckillStatus").put(username,seckillStatus); TbSeckillOrderModel tbSeckillOrderModel = (TbSeckillOrderModel)redisTemplate.boundHashOps("seckillOrder").get(username);tbSeckillOrderModel.setStatus(1+""); // 该订单已经支付过了//将redis中得订单添加到db。tbSeckillOrderService.save(tbSeckillOrderModel);//删除redis中得订单redisTemplate.boundHashOps("seckillOrder").delete(username);//删除防止用户重复提交订单得redis ->key 用户网速快可以二次购买redisTemplate.boundHashOps("userGoodsCount").delete(username);}
8.超时支付问题
//延迟队列,检查商品是否购买rabbitTemplate.convertAndSend(MQConfig.ONESECKILLEXCHANGE, MQConfig.ONESECKILLKEY, JSON.toJSONString(seckillStatus), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 获取到消息的本体给本体设置时长message.getMessageProperties().setExpiration("10000");return message;}}); // 没有消费成功的数据二次回滚@RabbitListener(queues = MQConfig.TWOSECKILLQUEUE)public void checkOrderPay(String json) {System.out.println("========>" + json);//json转换 成 UserSeckillStatus对象SeckillStatus seckillStatus = JSON.parseObject(json, SeckillStatus.class);//根据用户的username获取 userSeckillStatus 获取 id username status timeSeckillStatus userSeckillStatus = (SeckillStatus) redisTemplate.boundHashOps("userSeckillStatus").get(seckillStatus.getUsername());//验证userSeckillStatus.status==2表示延迟数据没有支付if (userSeckillStatus.getStatus() == 2) {//回滚数据//获取redis中指定时间段下的商品信息goods:serSeckillStatus.time 获取具体的商品TbSeckillGoodsModel goodsModel = (TbSeckillGoodsModel) redisTemplate.boundHashOps("goods:" + userSeckillStatus.getTime()).get(userSeckillStatus.getGoodId().toString());//验证goods:2024010514.商品编号-> stockCount<=0//如果==0 数据库数据、redis的秒杀商品数据、该商品的goodsQueue:编号 队列 回滚数据if (goodsModel.getStockCount() <= 0) {TbSeckillGoodsModel goodModel = tbSeckillGoodsService.getById(goodsModel.getId().toString());// 数据库里的数据加一,这里是以防万一,数据不对称// 缓存里面的数据也要加一,不然就会保留一件没有卖出goodModel.setStockCount(goodModel.getStockCount() + 1);tbSeckillGoodsService.updateById(goodModel);}// redis中存放的数据也要加一goodsModel.setStockCount(goodsModel.getStockCount() + 1);// redis 也回滚一下redisTemplate.boundHashOps("goods:" + userSeckillStatus.getTime()).put(userSeckillStatus.getGoodId().toString(), goodsModel);// 凭证也要 回滚一下redisTemplate.boundListOps("goodsQueue:" + userSeckillStatus.getGoodId()).leftPush(userSeckillStatus.getGoodId().toString());//修改userSeckillStatus状态为5、 未支付userSeckillStatus.setStatus(5);redisTemplate.boundHashOps("userSeckillStatus").put(seckillStatus.getUsername(), userSeckillStatus);//删除之前购买的订单seckillOrder 还有userGoodsCountredisTemplate.boundHashOps("seckillOrder").delete(seckillStatus.getUsername());// 删除就可以再买一次,不删除就无法继续购买了redisTemplate.boundHashOps("userGoodsCount").delete(seckillStatus.getUsername());}}
9.总结redis 七次结构的使用
good:time hash hk goodId hv goodModel // 存储指定时间下的模型数据 userQueue list seckillStatus 队列 先进先出 和 异步 // 解决超时问题,一个一个 给异步来处理,比原来一群来处理安全性更高 seckillOrder hash hk username hv orderModel // 存放每一用户订单 userSeckillStatus hash hk username hv // 这里可以知道每个用户的订单状态 lock:id // 加了一个分布式锁,防止卖超的现象 userGoodCount hash hk username hv int // 防止用户多次下单,提高安全性 goodQueue:id list int // 有票的人才可以下单成功