Springboot整合Redis如何实现超卖问题
写一段简单正常的超卖逻辑代码,多个用户同时操作同一段数据,探究出现的问题。
Redis中存储一项数据信息,请求对应接口,获取商品数量信息;商品数量信息如果大于0,则扣减1,重新存储Redis中;运行代码测试问题。
/*** Redis数据库操作,超卖问题模拟
* @author
*
*/
@RestController
public class RedisController {
// 引入String类型redis操作模板
@Autowired
private StringRedisTemplate stringRedisTemplate;
// 测试数据设置接口
@RequestMapping("
/setStock"
)
public String setStock() {
stringRedisTemplate.opsForValue().set("
stock"
, "
100"
);
return "
ok"
;
}
// 模拟商品超卖代码
@RequestMapping("
/deductStock"
)
public String deductStock() {
// 获取Redis数据库中的商品数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("
stock"
));
// 减库存
if(stock >
0) {
int realStock = stock -1;
stringRedisTemplate.opsForValue().set("
stock"
, String.valueOf(realStock));
System.out.println("
商品扣减成功,剩余商品:"
+realStock);
}else {
System.out.println("
库存不足....."
);
}
return "
end"
;
}
} 超卖问题单服务器单应用情况下
在单应用模式下,使用jmeter压测。
测试结果:
每个请求相当于一个线程,当几个线程同时拿到数据时,线程A拿到库存为84,这个时候线程B也进入程序,并且抢占了CPU,访问库存为84,最后两个线程都对库存减一,导致最后修改为83,实际上多卖出去了一件
既然线程和线程之间,数据处理不一致,能否使用synchronized加锁测试?
设置synchronized依旧还是先测试单服务器
// 模拟商品超卖代码,// 设置synchronized同步锁
@RequestMapping("
/deductStock1"
)
public String deductStock1() {
synchronized (this) {
// 获取Redis数据库中的商品数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("
stock"
));
// 减库存
if(stock >
0) {
int realStock = stock -1;
stringRedisTemplate.opsForValue().set("
stock"
, String.valueOf(realStock));
System.out.println("
商品扣减成功,剩余商品:"
+realStock);
}else {
System.out.println("
库存不足....."
);
}
}
return "
end"
;
}
数量100
重新压测,得到的日志信息如下所示:
在单机模式下,添加synchronized关键字,的确能够避免商品的超卖现象!
但是在分布式微服务中,针对该服务设置了集群,synchronized依旧还能保证数据的正确性吗?
假设多个请求,被注册中心负载均衡,每个微服务中的该处理接口,都添加有synchronized,
依然会出现类似的超卖问题:
synchronized只是针对单一服务器的JVM进行加锁,但是分布式是很多个不同的服务器,导致两个线程或多个在不同服务器上共同对商品数量信息做了操作!
Redis实现分布式锁在Redis中存在一条命令setnx (set if not exists)
setnx key value如果不存在key,则可以设置成功;否则设置失败。
修改处理接口,增加key
// 模拟商品超卖代码@RequestMapping("
/deductStock2"
)
public String deductStock2() {
// 创建一个key,保存至redis
String key = "
lock"
;
// setnx
// 由于redis是一个单线程,执行命令采取“队列”形式排队!
// 优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败。
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "
this is lock"
);
// 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false
if (!result) {
// 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!
return "
err"
;
}
// 获取Redis数据库中的商品数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("
stock"
));
// 减库存
if(stock >
0) {
int realStock = stock -1;
stringRedisTemplate.opsForValue().set("
stock"
, String.valueOf(realStock));
System.out.println("
商品扣减成功,剩余商品:"
+realStock);
}else {
System.out.println("
库存不足....."
);
}
// 程序执行完成,则删除这个key
stringRedisTemplate.delete(key);
return "
end"
;
}
1、请求进入接口中,如果redis中不存在key,则会新建一个setnx;如果存在,则不会新建,同时返回错误编码,不会继续执行抢购逻辑。2、当创建成功后,执行抢购逻辑。3、抢购逻辑执行完成后,删除数据库中对应的setnx的key。让其他请求能够设置并操作。
这种逻辑来说比之前单一使用syn合理的多,但是如果执行抢购操作中出现了异常,导致这个key无法被删除。以至于其他处理请求,一直无法拿到key,程序逻辑死锁!
可以采取try … finally进行操作
/*** 模拟商品超卖代码 设置
*
* @return
*/
@RequestMapping("
/deductStock3"
)
public String deductStock3() {
// 创建一个key,保存至redis
String key = "
lock"
;
// setnx
// 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "
this is lock"
);
// 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false
if (!result) {
// 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!
return "
err"
;
}
try {
// 获取Redis数据库中的商品数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("
stock"
));
// 减库存
if (stock >
0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("
stock"
, String.valueOf(realStock));
System.out.println("
商品扣减成功,剩余商品:"
+ realStock);
} else {
System.out.println("
库存不足....."
);
}
} finally {
// 程序执行完成,则删除这个key
// 放置于finally中,保证即使上述逻辑出问题,也能del掉
stringRedisTemplate.delete(key);
}
return "
end"
;
}
这个逻辑相比上面其他的逻辑来说,显得更加的严谨。
但是,如果一套服务器,因为断电、系统崩溃等原因出现宕机,导致本该执行finally中的语句未成功执行完成!!同样出现key一直存在,导致死锁!
通过超时间解决上述问题在设置成功setnx后,以及抢购代码逻辑执行前,增加key的限时。
/*** 模拟商品超卖代码 设置setnx保证分布式环境下,数据处理安全行问题;<
br>
* 但如果某个代码段执行异常,导致key无法清理,出现死锁,添加try...finally;
<
br>
* 如果某个服务因某些问题导致释放key不能执行,导致死锁,此时解决思路为:增加key的有效时间;
<
br>
* 为了保证设置key的值和设置key的有效时间,两条命令构成同一条原子命令,将下列逻辑换成其他代码。
*
* @return
*/
@RequestMapping("
/deductStock4"
)
public String deductStock4() {
// 创建一个key,保存至redis
String key = "
lock"
;
// setnx
// 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败
//boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "
this is lock"
);
//让设置key和设置key的有效时间都可以同时执行
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "
this is lock"
, 10, TimeUnit.SECONDS);
// 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false
if (!result) {
// 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!
return "
err"
;
}
// 设置key有效时间
//stringRedisTemplate.expire(key, 10, TimeUnit.SECONDS);
try {
// 获取Redis数据库中的商品数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("
stock"
));
// 减库存
if (stock >
0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("
stock"
, String.valueOf(realStock));
System.out.println("
商品扣减成功,剩余商品:"
+ realStock);
} else {
System.out.println("
库存不足....."
);
}
} finally {
// 程序执行完成,则删除这个key
// 放置于finally中,保证即使上述逻辑出问题,也能del掉
stringRedisTemplate.delete(key);
}
return "
end"
;
}
但是上述代码的逻辑中依旧会有问题:
如果处理逻辑中,出现超时问题。当逻辑执行时,时间超过设定key有效时间,此时会出现什么问题?
从上图可以清楚的发现问题:如果一个请求执行时间超过了key的有效时间。新的请求执行过来时,必然可以拿到key并设置时间;此时的redis中保存的key并不是请求1的key,而是别的请求设置的。当请求1执行完成后,此处删除key,删除的是别的请求设置的key!
依然出现了key形同虚设的问题!如果失效一直存在,超卖问题依旧不会解决。
通过key设置值匹配的方式解决形同虚设问题既然出现key形同虚设的现象,是否可以增加条件,当finally中需要执行删除操作时,获取数据判断值是否是该请求中对应的,如果是则删除,不是则不管!
修改上述代码如下所示:
/*** 模拟商品超卖代码 <
br>
* 解决`deductStock6`中,key形同虚设的问题。
*
* @return
*/
@RequestMapping("
/deductStock5"
)
public String deductStock5() {
// 创建一个key,保存至redis
String key = "
lock"
;
String lock_value = UUID.randomUUID().toString();
// setnx
//让设置key和设置key的有效时间都可以同时执行
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, lock_value, 10, TimeUnit.SECONDS);
// 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false
if (!result) {
// 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!
return "
err"
;
}
try {
// 获取Redis数据库中的商品数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("
stock"
));
// 减库存
if (stock >
0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("
stock"
, String.valueOf(realStock));
System.out.println("
商品扣减成功,剩余商品:"
+ realStock);
} else {
System.out.println("
库存不足....."
);
}
} finally {
// 程序执行完成,则删除这个key
// 放置于finally中,保证即使上述逻辑出问题,也能del掉
// 判断redis中该数据是否是这个接口处理时的设置的,如果是则删除
if(lock_value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(key))) {
stringRedisTemplate.delete(key);
}
}
return "
end"
;
}
由于获得锁的线程必须执行完减库存逻辑才能释放锁,所以在此期间所有其他的线程都会由于没获得锁,而直接结束程序,导致有很多库存根本没有卖出去,所以这里应该可以优化,让没获得锁的线程等待,或者循环检查锁
最终版我们将锁封装到一个实体类中,然后加入两个方法,加锁和解锁
@Componentpublic class RedisLock {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final long acquireTimeout = 10*1000;
// 获取锁之前的超时时间(获取锁的等待重试时间)
private final int timeOut = 20;
// 获取锁之后的超时时间(防止死锁)
@Autowired
private StringRedisTemplate stringRedisTemplate;
// 引入String类型redis操作模板
/**
* 获取分布式锁
* @return 锁标识
*/
public boolean getRedisLock(String lockName,String lockValue) {
// 1.计算获取锁的时间
Long endTime = System.currentTimeMillis() + acquireTimeout;
// 2.尝试获取锁
while (System.currentTimeMillis() <
endTime) {
//3. 获取锁成功就设置过期时间 让设置key和设置key的有效时间都可以同时执行
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockName, lockValue, timeOut, TimeUnit.SECONDS);
if (result) {
return true;
}
}
return false;
}
/**
* 释放分布式锁
* @param lockName 锁名称
* @param lockValue 锁值
*/
public void unRedisLock(String lockName,String lockValue) {
if(lockValue.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(lockName))) {
stringRedisTemplate.delete(lockName);
}
}
} @RestController
public class RedisController {
// 引入String类型redis操作模板
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisLock redisLock;
@RequestMapping("
/setStock"
)
public String setStock() {
stringRedisTemplate.opsForValue().set("
stock"
, "
100"
);
return "
ok"
;
}
@RequestMapping("
/deductStock"
)
public String deductStock() {
// 创建一个key,保存至redis
String key = "
lock"
;
String lock_value = UUID.randomUUID().toString();
try {
boolean redisLock = this.redisLock.getRedisLock(key, lock_value);
//获取锁
if (redisLock)
{
// 获取Redis数据库中的商品数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("
stock"
));
// 减库存
if (stock >
0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("
stock"
, String.valueOf(realStock));
System.out.println("
商品扣减成功,剩余商品:"
+ realStock);
} else {
System.out.println("
库存不足....."
);
}
}
} finally {
redisLock.unRedisLock(key,lock_value);
//释放锁
}
return "
end"
;
}
}
可以看到失败的线程不会直接结束,而是会尝试重试,一直到重试结束时间,才会结束
实际上这个最终版依然存在3个问题
1、在finally流程中,由于是先判断在处理。如果判断条件结束后,获取到的结果为true。但是在执行del操作前,此时jvm在执行GC操作(为了保证GC操作获取GC roots根完全,会暂停java程序),导致程序暂停。在GC操作完成并恢复后,执行del操作时,当前被加锁的key是否仍然存在?
2、问题如图所示
背景:
在电商交易中,商品的库存数量是一个关键指标。在秒杀等场景下,用户涌入的瞬间很可能会导致服务器的并发量暴增,如果没有采取相应的技术手段,就很可能会出现超卖的问题。本文将介绍如何使用Springboot整合Redis来实现超卖问题的解决方案。
一、使用Redis的原子性特性
Redis是一个内存中的数据存储系统,具有许多优秀的特性。其中最为关键的一点就是Redis的原子性。在Redis中,对于一个操作,要么全部执行成功,要么全部执行失败,不存在中间状态。这就为我们防止超卖问题提供了有力的保障。
二、使用Redis的分布式锁
除了原子性之外,Redis还拥有另外一个关键特性——分布式锁。在秒杀场景中,库存数量快速减少需要在数据库中原子性地更新库存数量,否则极易导致超卖问题的发生。此时,我们可以使用Redis的分布式锁来保证在多线程环境下同一时刻只有一个线程正在更新库存数量,避免数据更新冲突。
三、使用Redis的缓存特性
在秒杀场景下,用户会短期内频繁地访问库存数量,直接查询数据库会造成数据库压力过大的情况。此时,我们可以选择使用Redis的缓存特性,在Redis中缓存库存数量,从而避免了频繁地访问数据库,在一定程度上减轻了数据库的压力。当用户购买商品时,将会先从Redis中查询库存数量,如果库存足够则会在Redis中原子性地更新库存数量并返回成功,否则就会返回失败。
结语:
以上就是使用Springboot整合Redis来解决秒杀场景中的超卖问题的方法。简单来说,使用Redis的原子性特性、分布式锁以及缓存特性可以在保证系统高并发量的情况下,防止超卖问题的发生。