整体设计实现主要原理
依靠redis的事务特性来实现商品的抢购功能(redis事务提供了一种“将多个命令打包, 然后一次性、按顺序地执行”的机制, 并且事务在执行的期间不会主动中断 —— 服务器在执行完事务中的所有命令之后, 才会继续处理其他客户端的其他命令)。
WATCH
WATCH只能在客户端进入事务状态之前执行, 在事务状态下发送 WATCH 命令会引发一个错误, 但它不会造成整个事务失败, 也不会修改事务队列中已有的数据(和前面处理 MULTI 的情况一样);WATCH命令用于在事务开始之前监视任意数量的键: 当调用 EXEC 命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败。
DECRBY
如果key不存在,操作之前,key就会被置为0。如果key的value类型错误或者是个不能表示成数字的字符串,就返回错误。这个操作最多支持64位有符号的正型数字。
INCRBY
如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCRBY 命令。如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。本操作的值限制在 64 位(bit)有符号数字表示之内。
大概步骤
1、开启redis对某个商品库存key的监控(watch)
2、开启redis事务
3、redis自增操作加/减库存(某个商品库存key)
4、执行事务(如果watch的key的值发生了变动,则事务提交失败)
5、购买信息插入到消息队列
6、如果抢购需要进行ID购买限制,则需要在第4步后增加相应操作
7、需要对消息队列中的数据进行处理(生成用户未支付订单数据)
抢购代码清单(含测试)
/** * @className: SecKillProduct * @date: 2018-01-01 00:00 * @author: author * @description:(模拟并发实现抢购商品模型:针对商品,用户,商品库存,ID购买限制进行建模) */ public class SecKillProduct { /**商品key ,剩余库存value*/ private static final String PRODUCTSTOCKKEY = "iphone_stock"; /**商品名称*/ private static final String PRODUCTNAME = "iphone"; /**秒杀商品统一后缀*/ private static final String PRODUCTSTOCKKEY_SUFFIX = "_stock"; /**秒杀商品库存*/ private static final String SECKILLNUM = "10"; /**用户已经秒杀商品数前缀*/ private static final String HAS_BOUGHT_PREFIX = "has_bought_set_stock_"; /**每个用户ID可以购买的数量:设置为0代表不限量*/ private static final Integer BUYLIMITCOUNT = 1; public static void main(String[] args) throws InterruptedException { long l = System.nanoTime(); Jedis jedis = RedisUtil.getConn(); jedis.set(PRODUCTSTOCKKEY, SECKILLNUM); //并发数 int threadNum = 5000; //CountDownLatch实现”闭锁“方便统计程序运行时长 final CountDownLatch startSignal = new CountDownLatch(threadNum); //CyclicBarrier实现关卡,模拟实现抢购并发 final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum); for (int i = 0; i < threadNum; i++) { new Thread(new Runnable() { @Override public void run() { try { //一直阻塞当前线程,直到所有线程都运行至此,才进行后续代码的运行 cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } //减库存和创建订单操作 String userId = "100000001"; String resultMsg = new SecKillProduct().secKillProduct(PRODUCTNAME, userId); System.out.println(resultMsg); //每运行一个线程CountDownLatch数量减一,为了能够统计到程序花费的时间和剩余库存 startSignal.countDown(); } }).start(); } //主线程等待CountDownLatch计数为零才运行 startSignal.await(); System.out.println("剩余库存:" + jedis.get(PRODUCTSTOCKKEY)); jedis.close(); long l1 = System.nanoTime(); System.out.println("一共花费" + (l1 - l)/1000/1000 + "毫秒"); } /** * 秒杀商品逻辑处理 * @param productName 商品名称 * @param userId 用户ID * @return */ private String secKillProduct(String productName, String userId) { String resultMsg = "抢购" + productName + "失败!"; Jedis jedis = null; //使用try,在finally中释放jedis资源。异常情况下jedis对象不会被踢,这将导致jedis使用完,这样请求就无法再获取空闲连接 try { jedis = RedisUtil.getConn(); String productStockKey = productName + PRODUCTSTOCKKEY_SUFFIX; //开启对productStockCacheKey的监控:在事务提交的时候会检查productStockCacheKey的值是否改变,如果改变则不进行事务提交, //但是后续代码会继续运行 jedis.watch(productStockKey); //商品库存 String stock = jedis.get(productStockKey); //库存校验 if(stock == null || Integer.parseInt(stock) <= 0) { resultMsg = Thread.currentThread().getName() +" 抢购" + productName + "失败,请重试!"; return resultMsg; } //开启Redis事务 Transaction tx = jedis.multi(); //库存减一 tx.decrBy(productStockKey, 1); //执行事务 List<Object> resultList = tx.exec(); //校验事务是佛提交成功 if (resultList == null || resultList.isEmpty()) { //redis watch监控的(productStockKey的值)被更改过(同一时间只允许一个线程减库存),抢购失败; //[扩展:如果在执行 WATCH 命令之后, EXEC 命令或 DISCARD 命令先被执行了的话,那么就不需要再执行 UNWATCH 了。 //因为 EXEC 命令会执行事务,因此 WATCH 命令的效果已经产生了; // 而 DISCARD 命令在取消事务的同时也会取消所有对 key 的监视,因此这两个命令执行之后,就没有必要执行 UNWATCH 了]。 resultMsg = Thread.currentThread().getName() +" 抢购" + productName + "失败,请重试!"; return resultMsg; } //为用户userId加锁,防止出现用户重复购买(控制用户购买量) synchronized (userId) { //获取当前用户的对应商品的购买数量 String buyCount = jedis.get(HAS_BOUGHT_PREFIX + "_" + PRODUCTNAME + userId); if(buyCount != null) { //校验已购买数量是否大于限制数量 boolean isBuy = Integer.parseInt(buyCount) >= BUYLIMITCOUNT ? true : false; if (isBuy){ //重复秒杀/超出ID限制购买数 resultMsg = Thread.currentThread().getName() +"抢购" + productName + "失败,您已经购买过该产品了!"; jedis.incrBy(productStockKey, 1); return resultMsg; } } else { //初始化购买数 buyCount = "0"; } //购买数+1 buyCount = (Integer.valueOf(buyCount) + 1) + ""; //将用户购买某商品对应的数量存保存到redis中:为防止服务器挂了后这个数据不准确, //需要在web服务启动的时候根据key值更新下购买数量 String sataus = jedis.set(HAS_BOUGHT_PREFIX + "_" + PRODUCTNAME + userId, buyCount); if("OK".equals(sataus)) { //将商品及用户信息加入到消息队列中:这里不做具体实现:可以使用redis队列,rabbitmq,activeMQ等, // 需要额外的线程或者定时任务来处理消息队列里面的数据(生成用户未支付订单-->支付请求的时候用户 //只需要查询自己未支付订单即可(抢购)) resultMsg = Thread.currentThread().getName() +" 抢购" + productName + "成功!"; } else { //购买数量加入失败则增加商品可抢购库存 resultMsg = Thread.currentThread().getName() +"抢购" + productName + "失败,请重试!"; jedis.incrBy(productStockKey, 1); } } } catch (Exception e) { e.printStackTrace(); } finally { jedis.close(); } return resultMsg; } }
/** * @className: SecKillProduct * @date: 2018-01-01 00:00 * @author: author * @description:(redis工具类) */ public class RedisUtil { private static JedisPool pool = null; /** * 获取jedis连接池 * */ public static JedisPool getPool() { if(pool == null) { //创建jedis连接池配置 JedisPoolConfig config = new JedisPoolConfig(); //配置最大jedis实例数 config.setMaxTotal(500); //配置资源池最大闲置数 config.setMaxIdle(20); //等待可用连接的最大时间 config.setMaxWaitMillis(10000); //创建redis连接池 pool = new JedisPool(config,"127.0.0.1",6379,10000); } return pool; } /** * 获取jedis连接 * */ public static Jedis getConn() { Jedis jedis = getPool().getResource(); jedis.auth("123456"); return jedis; } }
此文缺少对redis缓存key值过期时间的处理,以及抢购订单失效问题(在规定时间内用户不支付视为失效订单)
今天就分享到这里,具体操作源码,里面注释详细,可以将其复制到编辑器中查看。
希望大家多多指点,不喜勿喷,谢谢!
注意:模拟并发测试类写得有点乱,大家可以将其替换掉,主要的是secKillProduct(String productName, String userId)这个方法。需要commons-pool2-2.3.jar和redis相关的包。commons-pool1.x版本我本地测试报异常。