Redis分布式锁的原理是什么和怎么实现

1 一人一单并发安全问题

之前一人一单的业务使用的悲观锁,在分布式系统下,是无法生效的。

在最理想的情况下,如果一个线程可以成功地获取互斥锁并对订单进行查询和创建,则其他线程将无法干扰。它的原理是会有一个锁监视器,来监听是谁获得了锁。

Redis分布式锁:避免并发危机的利器

但是问题就出现在:

分布式系统下,有多个不同的JVM,不同的JVM的环境下,锁监听器是有多个的,就会出现有的线程在别的线程已经拿到锁的情况下,仍然可以获取的到锁。

这个时候,普通的JVM中的锁就已经不管用了,就需要我们利用分布式锁 。

2 分布式锁的原理和实现2.1 什么是分布式锁

就是可以满足分布式系统或集群模式下多进程可见并且互斥的锁。

它的实现原理就是,不同的JVM环境,都来共用一个锁监视器。这样就不会导致出现多个线程用多把锁的情况了。

特点:

2.2 分布式锁的实现

主要有三种实现方法,我们可以都来进行一个对比。

如下图:

这里主要讲基于Redis的分布式锁的实现 。

实现Reids分布式锁的方法主要就下面两个步骤:

1. 获取锁

使用Redis中String类型的setnx方法(保证互斥性)已经是大家熟知的获取锁的方式。为了防止redis服务器崩溃,我们需要为锁设置超时时间,以避免出现死锁。(非阻塞)

所以,获取锁的方式可以使用如下代码

SET lock thread1 nx ex 10

lock是锁的key,thread1 是value,nx就是setnx方法,ex就是设置超时时间

2. 释放锁

释放锁就简单了,删除即可。

del lock

代码实现:

需求:定义一个接口,利用Redis实现分布式锁的功能。

代码如下:

接口代码:

package com.hmdp.utils;

public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁的持有时间,过期自动释放
* @return true代表获取锁成功,false代表获取锁失败。
*/
boolean tryLock(long timeoutSec);

/**
* 释放锁
*/
void unlock();

}

接口实现类:

package com.hmdp.utils;

import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;

/**
* @Version 1.0
*/
public class SimpleRedisLock implements ILock {
//Redis
private StringRedisTemplate stringRedisTemplate;

//业务名称,也就是锁的名称
private String name;

public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;

this.name = name;

}
//key的前缀
private static final String KEY_PREFIX = "
lock:"
;

@Override
public boolean tryLock(long timeoutSec) {
//获取线程id,当作set的value
long threadId = Thread.currentThread().getId();

Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId+"
"
, timeoutSec, TimeUnit.SECONDS);

return Boolean.TRUE.equals(success);

}
//释放锁
@Override
public void unlock() {
//删除key
stringRedisTemplate.delete(KEY_PREFIX+name);

}
}

业务层获取锁和释放锁(优惠券秒杀业务修改)

package com.hmdp.service.impl;

import com.hmdp.dto.Result;

import com.hmdp.entity.SeckillVoucher;

import com.hmdp.entity.VoucherOrder;

import com.hmdp.mapper.VoucherOrderMapper;

import com.hmdp.service.ISeckillVoucherService;

import com.hmdp.service.IVoucherOrderService;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

import com.hmdp.utils.RedisIdWorker;

import com.hmdp.utils.SimpleRedisLock;

import com.hmdp.utils.UserHolder;

import org.springframework.aop.framework.AopContext;

import org.springframework.data.redis.core.StringRedisTemplate;

import org.springframework.stereotype.Service;

import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

import java.time.LocalDateTime;

/**
* <
p>

* 服务实现类
* <
/p>

*
*/
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<
VoucherOrderMapper, VoucherOrder>
implements IVoucherOrderService {

@Resource
private ISeckillVoucherService iSeckillVoucherService;

@Resource
private RedisIdWorker redisIdWorker;

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public Result seckillVoucher(Long voucherId) {
//1.获取优惠券信息
SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);

//2.判断是否已经开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
Result.fail("
秒杀尚未开始!"
);

}
//3.判断是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
Result.fail("
秒杀已经结束了!"
);

}
//4.判断库存是否充足
if (voucher.getStock() <
1) {
Result.fail("
库存不充足!"
);

}
//5.扣减库存
boolean success = iSeckillVoucherService.update()
.setSql("
stock = stock-1"
).eq("
voucher_id"
,voucherId).gt("
stock"
,0)
.update();

if (!success){
Result.fail("
库存不充足!"
);

}
Long userId = UserHolder.getUser().getId();

//1.创建锁对象
SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "
order:"
+ userId);

//2.尝试获取锁
boolean isLock = lock.tryLock(1200);

if (!isLock){
//获取锁失败
return Result.fail("
一个用户只能下一单!"
);

}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();

return proxy.createVoucherOrder(voucherId);

} finally {
//释放锁
lock.unlock();

}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();

//6.根据优惠券id和用户id判断订单是否已经存在
//如果存在,则返回错误信息
int count = query().eq("
user_id"
, userId).eq("
voucher_id"
, voucherId).count();

if (count >
0) {
return Result.fail("
用户已经购买!"
);

}
//7. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();

//7.1添加订单id
Long orderId = redisIdWorker.nextId("
order"
);

voucherOrder.setId(orderId);

//7.2添加用户id
voucherOrder.setUserId(userId);

//7.3添加优惠券id
voucherOrder.setVoucherId(voucherId);

save(voucherOrder);

//8.返回订单id
return Result.ok(orderId);

}
}

Redis分布式锁是一种常见的解决并发问题的方案,可以有效避免多线程或分布式系统中出现的竞争性访问资源的问题。本文将探讨Redis分布式锁的原理和实现方法。
一、为什么需要分布式锁?
当多个进程或线程同时对一个资源进行操作时,就会出现并发问题。这时候需要对资源进行同步操作,保证同一时间只有一个进程或线程能够对资源进行操作,避免资源竞争导致的数据混乱或崩溃。
在分布式系统中,由于多个节点同时访问同一资源,需要对分布式锁进行管理,才能保证分布式系统的正常运行。
二、Redis分布式锁原理
Redis分布式锁的实现依赖于Redis的原子性操作,主要原理如下:
1.获取锁
在获取锁之前,会设置一个超时时间,确保在获取锁之后锁的过期时间可以自动失效。如果获取锁失败,则需要等待一段时间再进行下一次尝试。
使用Redis的setnx()命令可以原子性地操作字符串,如果键不存在则设置键值。通过在键上面设置一个超时时间,可以避免死锁问题的出现。
2.释放锁
需要使用Redis的del()命令删除键,确保锁已释放。为了保证锁的可重入性,需要在键的值中维护一个计数器,当计数器减到0时,才算是释放锁。
3.锁的可重入性
通过在Value中维护一个计数器,可以实现锁的可重入性。同时需要在Key的过期时间上使用一个Hash表来保存多个线程的使用锁时间和持有次数。这样,在释放锁时可以硬性判断,避免释放不属于自己的锁。
三、Redis分布式锁实现
1.单节点实现
在单节点中,可以直接使用Redis的setnx()和del()命令实现分布式锁。具体实现步骤如下:
(1)使用setnx()命令设置锁的键值,返回1表示获取锁成功,返回0表示获取锁失败;
(2)设置超时时间,防止锁被持有而无法释放;
(3)使用del()命令释放锁。
但是,这种方式存在单点故障的问题。如果Redis节点宕机,会导致锁无法及时释放,影响系统的可用性。
2.Redisson实现
为了解决单点故障问题,采用Redisson实现Redis分布式锁。Redisson基于Java客户端实现,提供了可重入锁、公平锁和联锁等多种锁类型。
使用Redisson实现Redis分布式锁的步骤如下:
(1)创建Redis客户端,并设置相应的连接参数;
(2)使用Redisson的RLock对象获取锁,返回的RFuture对象可以通过addListener()方法添加回调函数;
(3)在锁上进行相应的操作;
(4)释放锁。
Redisson还提供了监控锁的超时时间和异步锁等高级功能,可以满足不同场景的需求。
四、实现注意事项
在使用Redis分布式锁时,需要注意以下几点:
1.超时时间设置
需要在获取锁时设置超时时间,确保在获取锁之后锁的过期时间可以自动失效。同时,在释放锁时需要考虑业务处理时间,确保锁在业务逻辑处理完毕后释放。
2.锁的粒度
锁的粒度需要根据具体的业务需求进行设置。如果锁的粒度过大,容易造成大量线程的等待,影响系统性能;如果锁的粒度过小,则可能会造成死锁问题的出现。
3.锁的重入性
需要在Value中维护一个计数器,避免锁的重入问题,同时需要维护多个线程在Key的过期时间上的使用情况。
五、总结
Redis分布式锁是一种常见的解决并发问题的方案,可以有效避免多线程或分布式系统中出现的竞争性访问资源的问题。在使用Redis分布式锁时,需要注意超时时间的设置、锁的粒度和锁的重入性等问题。同时,使用Redisson等Java客户端可以更方便地实现Redis分布式锁的管理。