玖叶教程网

前端编程开发入门

如何在并发环境下实现商品抢购功能

抢购流程

整体设计实现主要原理

依靠redis的事务特性来实现商品的抢购功能(redis事务提供了一种“将多个命令打包, 然后一次性、按顺序地执行”的机制, 并且事务在执行的期间不会主动中断 —— 服务器在执行完事务中的所有命令之后, 才会继续处理其他客户端的其他命令)。

WATCH

WATCH只能在客户端进入事务状态之前执行, 在事务状态下发送 WATCH 命令会引发一个错误, 但它不会造成整个事务失败, 也不会修改事务队列中已有的数据(和前面处理 MULTI 的情况一样);WATCH命令用于在事务开始之前监视任意数量的键: 当调用 EXEC 命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败。

DECRBY

如果key不存在,操作之前,key就会被置为0。如果key的value类型错误或者是个不能表示成数字的字符串,就返回错误。这个操作最多支持64位有符号的正型数字。

INCRBY

如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCRBY 命令。如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。本操作的值限制在 64 位(bit)有符号数字表示之内。

大概步骤

1、开启redis对某个商品库存key的监控(watch)

2、开启redis事务

3、redis自增操作加/减库存(某个商品库存key)

4、执行事务(如果watch的key的值发生了变动,则事务提交失败)

5、购买信息插入到消息队列

6、如果抢购需要进行ID购买限制,则需要在第4步后增加相应操作

7、需要对消息队列中的数据进行处理(生成用户未支付订单数据)

抢购代码清单(含测试)

/**
* @className: SecKillProduct
* @date: 2018-01-01 00:00
* @author: author
* @description:(模拟并发实现抢购商品模型:针对商品,用户,商品库存,ID购买限制进行建模)
*/
public class SecKillProduct {
/**商品key ,剩余库存value*/
private static final String PRODUCTSTOCKKEY = "iphone_stock";
/**商品名称*/
private static final String PRODUCTNAME = "iphone";
/**秒杀商品统一后缀*/
private static final String PRODUCTSTOCKKEY_SUFFIX = "_stock";
/**秒杀商品库存*/
private static final String SECKILLNUM = "10";
/**用户已经秒杀商品数前缀*/
private static final String HAS_BOUGHT_PREFIX = "has_bought_set_stock_";
/**每个用户ID可以购买的数量:设置为0代表不限量*/
private static final Integer BUYLIMITCOUNT = 1; 
public static void main(String[] args) throws InterruptedException {
long l = System.nanoTime();
Jedis jedis = RedisUtil.getConn();
jedis.set(PRODUCTSTOCKKEY, SECKILLNUM);
//并发数
int threadNum = 5000;
//CountDownLatch实现”闭锁“方便统计程序运行时长
final CountDownLatch startSignal = new CountDownLatch(threadNum);
//CyclicBarrier实现关卡,模拟实现抢购并发
final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum);
for (int i = 0; i < threadNum; i++) {
new Thread(new Runnable() {
 @Override
public void run() {
try {
//一直阻塞当前线程,直到所有线程都运行至此,才进行后续代码的运行
cyclicBarrier.await();
 } catch (InterruptedException e) {
 e.printStackTrace();
 } catch (BrokenBarrierException e) {
 e.printStackTrace();
 }
//减库存和创建订单操作
String userId = "100000001";
 String resultMsg = new SecKillProduct().secKillProduct(PRODUCTNAME, userId);
 System.out.println(resultMsg);
//每运行一个线程CountDownLatch数量减一,为了能够统计到程序花费的时间和剩余库存
startSignal.countDown();
 }
 }).start();
}
//主线程等待CountDownLatch计数为零才运行
startSignal.await();
System.out.println("剩余库存:" + jedis.get(PRODUCTSTOCKKEY));
jedis.close();
long l1 = System.nanoTime();
System.out.println("一共花费" + (l1 - l)/1000/1000 + "毫秒");
}
/**
 * 秒杀商品逻辑处理
* @param productName 商品名称
* @param userId 用户ID
 * @return
*/
private String secKillProduct(String productName, String userId) {
String resultMsg = "抢购" + productName + "失败!";
Jedis jedis = null;
//使用try,在finally中释放jedis资源。异常情况下jedis对象不会被踢,这将导致jedis使用完,这样请求就无法再获取空闲连接
try {
jedis = RedisUtil.getConn();
String productStockKey = productName + PRODUCTSTOCKKEY_SUFFIX;
//开启对productStockCacheKey的监控:在事务提交的时候会检查productStockCacheKey的值是否改变,如果改变则不进行事务提交, //但是后续代码会继续运行
jedis.watch(productStockKey);
//商品库存
String stock = jedis.get(productStockKey);
//库存校验
if(stock == null || Integer.parseInt(stock) <= 0) {
resultMsg = Thread.currentThread().getName() +" 抢购" + productName + "失败,请重试!";
return resultMsg;
}
//开启Redis事务
Transaction tx = jedis.multi();
//库存减一
tx.decrBy(productStockKey, 1);
//执行事务
List<Object> resultList = tx.exec();
//校验事务是佛提交成功
if (resultList == null || resultList.isEmpty()) {
//redis watch监控的(productStockKey的值)被更改过(同一时间只允许一个线程减库存),抢购失败;
 //[扩展:如果在执行 WATCH 命令之后, EXEC 命令或 DISCARD 命令先被执行了的话,那么就不需要再执行 UNWATCH 了。
//因为 EXEC 命令会执行事务,因此 WATCH 命令的效果已经产生了;
// 而 DISCARD 命令在取消事务的同时也会取消所有对 key 的监视,因此这两个命令执行之后,就没有必要执行 UNWATCH 了]。
resultMsg = Thread.currentThread().getName() +" 抢购" + productName + "失败,请重试!";
return resultMsg;
}
//为用户userId加锁,防止出现用户重复购买(控制用户购买量)
synchronized (userId) {
//获取当前用户的对应商品的购买数量
String buyCount = jedis.get(HAS_BOUGHT_PREFIX + "_" + PRODUCTNAME + userId);
if(buyCount != null) {
//校验已购买数量是否大于限制数量
boolean isBuy = Integer.parseInt(buyCount) >= BUYLIMITCOUNT ? true : false;
if (isBuy){
//重复秒杀/超出ID限制购买数
resultMsg = Thread.currentThread().getName() +"抢购" + productName + "失败,您已经购买过该产品了!";
jedis.incrBy(productStockKey, 1);
return resultMsg;
}
} else {
//初始化购买数
buyCount = "0";
}
//购买数+1
buyCount = (Integer.valueOf(buyCount) + 1) + "";
//将用户购买某商品对应的数量存保存到redis中:为防止服务器挂了后这个数据不准确, //需要在web服务启动的时候根据key值更新下购买数量
String sataus = jedis.set(HAS_BOUGHT_PREFIX + "_" + PRODUCTNAME + userId, buyCount);
if("OK".equals(sataus)) {
//将商品及用户信息加入到消息队列中:这里不做具体实现:可以使用redis队列,rabbitmq,activeMQ等,
// 需要额外的线程或者定时任务来处理消息队列里面的数据(生成用户未支付订单-->支付请求的时候用户 //只需要查询自己未支付订单即可(抢购))
resultMsg = Thread.currentThread().getName() +" 抢购" + productName + "成功!";
} else {
//购买数量加入失败则增加商品可抢购库存
resultMsg = Thread.currentThread().getName() +"抢购" + productName + "失败,请重试!";
jedis.incrBy(productStockKey, 1);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
jedis.close();
}
return resultMsg;
}
}
/**
* @className: SecKillProduct
* @date: 2018-01-01 00:00
* @author: author
* @description:(redis工具类)
*/
public class RedisUtil {
private static JedisPool pool = null;
/**
 * 获取jedis连接池
* */
public static JedisPool getPool() {
if(pool == null) {
//创建jedis连接池配置
JedisPoolConfig config = new JedisPoolConfig();
//配置最大jedis实例数
config.setMaxTotal(500);
//配置资源池最大闲置数
config.setMaxIdle(20);
//等待可用连接的最大时间
config.setMaxWaitMillis(10000);
//创建redis连接池
pool = new JedisPool(config,"127.0.0.1",6379,10000);
}
return pool; }
/**
 * 获取jedis连接
* */
public static Jedis getConn() {
Jedis jedis = getPool().getResource();
jedis.auth("123456");
return jedis;
}
}

此文缺少对redis缓存key值过期时间的处理,以及抢购订单失效问题(在规定时间内用户不支付视为失效订单)

今天就分享到这里,具体操作源码,里面注释详细,可以将其复制到编辑器中查看。

希望大家多多指点,不喜勿喷,谢谢!

注意:模拟并发测试类写得有点乱,大家可以将其替换掉,主要的是secKillProduct(String productName, String userId)这个方法。需要commons-pool2-2.3.jar和redis相关的包。commons-pool1.x版本我本地测试报异常。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言