Redis分布式锁实现的方法是什么

    一、分布式锁是什么

    分布式锁是 满足分布式系统或集群模式下多进程可见并且互斥的锁。

    基于Redis实现分布式锁:

    1、获取锁
    • 互斥:确保只能有一个线程获取锁;

    • 使用Redis分布式锁:实现并发控制神器

      非阻塞:尝试获取锁,成功返回true,失败返回false;

    添加锁过期时间,避免服务宕机引起死锁。

    SET lock thread1 NX EX 10

    2、释放锁
    • 手动释放;DEL key1

    • 超时释放,获取锁时添加一个超时锁;

    二、代码实例package com.guor.utils;


    import org.springframework.data.redis.core.StringRedisTemplate;


    import java.util.concurrent.TimeUnit;


    public class RedisLock implements ILock{

    private String name;

    private StringRedisTemplate stringRedisTemplate;


    public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
    this.name = name;

    this.stringRedisTemplate = stringRedisTemplate;

    }

    private static final String KEY_PREFIX = "
    lock:"
    ;


    @Override
    public boolean tryLock(long timeout) {
    // 获取线程唯一标识
    long threadId = Thread.currentThread().getId();

    // 获取锁
    Boolean success = stringRedisTemplate.opsForValue()
    .setIfAbsent(KEY_PREFIX + name, threadId+"
    "
    , timeout, TimeUnit.SECONDS);

    // 防止拆箱的空指针异常
    return Boolean.TRUE.equals(success);

    }

    @Override
    public void unlock() {
    stringRedisTemplate.delete(KEY_PREFIX + name);

    }
    } 上面代码存在锁误删问题:
  • 如果线程1获取锁,但线程1发生了阻塞,导致Redis超时释放锁;

  • 此时,线程2尝试获取锁,成功,并执行业务;

  • 此时,线程1重新开始执行任务,并执行完毕,执行释放锁(即删除锁);

  • 但是,线程1删除的锁,和线程2的锁是同一把锁,这就是分布式锁误删问题;

  • 在释放锁时,释放线程自己的分布式锁,就可以解决这个问题。

    package com.guor.utils;


    import cn.hutool.core.lang.UUID;

    import org.springframework.data.redis.core.StringRedisTemplate;


    import java.util.concurrent.TimeUnit;


    public class RedisLock implements ILock{

    private String name;

    private StringRedisTemplate stringRedisTemplate;


    public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
    this.name = name;

    this.stringRedisTemplate = stringRedisTemplate;

    }

    private static final String KEY_PREFIX = "
    lock:"
    ;

    private static final String UUID_PREFIX = UUID.randomUUID().toString(true) + "
    -"
    ;


    @Override
    public boolean tryLock(long timeout) {
    // 获取线程唯一标识
    String threadId = UUID_PREFIX + Thread.currentThread().getId();

    // 获取锁
    Boolean success = stringRedisTemplate.opsForValue()
    .setIfAbsent(KEY_PREFIX + name, threadId, timeout, TimeUnit.SECONDS);

    // 防止拆箱的空指针异常
    return Boolean.TRUE.equals(success);

    }

    @Override
    public void unlock() {
    // 获取线程唯一标识
    String threadId = UUID_PREFIX + Thread.currentThread().getId();

    // 获取锁中的标识
    String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);

    // 判断标示是否一致
    if(threadId.equals(id)) {
    // 释放锁
    stringRedisTemplate.delete(KEY_PREFIX + name);

    }
    }
    } 三、基于SETNX实现的分布式锁存在下面几个问题1、不可重入

    同一个线程无法多次获取同一把锁。

    2、不可重试

    获取锁只尝试一次就返回false,没有重试机制。

    3、超时释放

    锁的超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患。

    4、主从一致性

    如果Redis是集群部署的,主从同步存在延迟,当主机宕机时,此时会选一个从作为主机,但是此时的从没有锁标识,此时,其它线程可能会获取到锁,导致安全问题。

    四、Redisson实现分布式锁

    Redisson是基于Redis实现的操作Java内存数据网格。除了提供常用的分布式Java对象,它还提供了许多分布式服务,其中包括各种分布式锁的实现。

    1、pom<
    !--redisson-->

    <
    dependency>

    <
    groupId>
    org.redisson<
    /groupId>

    <
    artifactId>
    redisson<
    /artifactId>

    <
    version>
    3.13.6<
    /version>

    <
    /dependency>
    2、配置类package com.guor.config;


    import org.redisson.Redisson;

    import org.redisson.api.RedissonClient;

    import org.redisson.config.Config;

    import org.springframework.context.annotation.Bean;

    import org.springframework.context.annotation.Configuration;


    @Configuration
    public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
    // 配置
    Config config = new Config();


    /**
    * 单点地址useSingleServer,集群地址useClusterServers
    */
    config.useSingleServer().setAddress("
    redis://127.0.0.1:6379"
    ).setPassword("
    123456"
    );

    // 创建RedissonClient对象
    return Redisson.create(config);

    }
    } 3、测试类package com.guor;


    import lombok.extern.slf4j.Slf4j;

    import org.junit.jupiter.api.BeforeEach;

    import org.junit.jupiter.api.Test;

    import org.redisson.api.RLock;

    import org.redisson.api.RedissonClient;

    import org.springframework.boot.test.context.SpringBootTest;


    import javax.annotation.Resource;

    import java.util.concurrent.TimeUnit;


    @Slf4j
    @SpringBootTest
    class RedissonTest {

    @Resource
    private RedissonClient redissonClient;


    private RLock lock;


    @BeforeEach
    void setUp() {
    // 获取指定名称的锁
    lock = redissonClient.getLock("
    nezha"
    );

    }

    @Test
    void test() throws InterruptedException {
    // 尝试获取锁
    boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);

    if (!isLock) {
    log.error("
    获取锁失败"
    );

    return;

    }
    try {
    log.info("
    哪吒最帅,哈哈哈"
    );

    } finally {
    // 释放锁
    lock.unlock();

    }
    }
    } 五、探索tryLock源码1、tryLock源码尝试获取锁public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // 最大等待时间
    long time = unit.toMillis(waitTime);

    long current = System.currentTimeMillis();

    long threadId = Thread.currentThread().getId();

    Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);

    if (ttl == null) {
    return true;

    } else {
    // 剩余等待时间 = 最大等待时间 - 获取锁失败消耗的时间
    time -= System.currentTimeMillis() - current;

    if (time <
    = 0L) {// 获取锁失败
    this.acquireFailed(waitTime, unit, threadId);

    return false;

    } else {
    // 再次尝试获取锁
    current = System.currentTimeMillis();

    // subscribe订阅其它释放锁的信号
    RFuture<
    RedissonLockEntry>
    subscribeFuture = this.subscribe(threadId);

    // 当Future在等待指定时间time内完成时,返回true
    if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
    if (!subscribeFuture.cancel(false)) {
    subscribeFuture.onComplete((res, e) ->
    {
    if (e == null) {
    // 取消订阅
    this.unsubscribe(subscribeFuture, threadId);

    }

    });

    }

    this.acquireFailed(waitTime, unit, threadId);

    return false;
    // 获取锁失败
    } else {
    try {
    // 剩余等待时间 = 剩余等待时间 - 获取锁失败消耗的时间
    time -= System.currentTimeMillis() - current;

    if (time <
    = 0L) {
    this.acquireFailed(waitTime, unit, threadId);

    boolean var20 = false;

    return var20;

    } else {
    boolean var16;

    do {
    long currentTime = System.currentTimeMillis();

    // 重试获取锁
    ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);

    if (ttl == null) {
    var16 = true;

    return var16;

    }
    // 再次失败了,再看一下剩余时间
    time -= System.currentTimeMillis() - currentTime;

    if (time <
    = 0L) {
    this.acquireFailed(waitTime, unit, threadId);

    var16 = false;

    return var16;

    }
    // 再重试获取锁
    currentTime = System.currentTimeMillis();

    if (ttl >
    = 0L &
    &
    ttl <
    time) {
    // 通过信号量的方式尝试获取信号,如果等待时间内,依然没有结果,会返回false
    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

    } else {
    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);

    }
    time -= System.currentTimeMillis() - currentTime;

    } while(time >
    0L);


    this.acquireFailed(waitTime, unit, threadId);

    var16 = false;

    return var16;

    }
    } finally {
    this.unsubscribe(subscribeFuture, threadId);

    }
    }
    }
    }
    } 2、重置锁的有效期private void scheduleExpirationRenewal(long threadId) {
    RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();

    // this.getEntryName():锁的名字,一个锁对应一个entry
    // putIfAbsent:如果不存在,将锁和entry放到map里
    RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);

    if (oldEntry != null) {
    // 同一个线程多次获取锁,相当于重入
    oldEntry.addThreadId(threadId);

    } else {
    // 如果是第一次
    entry.addThreadId(threadId);

    // 更新有效期
    this.renewExpiration();

    }
    }

    更新有效期,递归调用更新有效期,永不过期

    private void renewExpiration() {
    // 从map中得到当前锁的entry
    RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());

    if (ee != null) {
    // 开启延时任务
    Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    public void run(Timeout timeout) throws Exception {
    RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());

    if (ent != null) {
    // 取出线程id
    Long threadId = ent.getFirstThreadId();

    if (threadId != null) {
    // 刷新有效期
    RFuture<
    Boolean>
    future = RedissonLock.this.renewExpirationAsync(threadId);

    future.onComplete((res, e) ->
    {
    if (e != null) {
    RedissonLock.log.error("
    Can'
    t update lock "
    + RedissonLock.this.getName() + "
    expiration"
    , e);

    } else {
    if (res) {
    // 递归调用更新有效期,永不过期
    RedissonLock.this.renewExpiration();

    }
    }
    });

    }
    }
    }
    }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
    // 10S
    ee.setTimeout(task);

    }
    } 更新有效期protected RFuture<
    Boolean>
    renewExpirationAsync(long threadId) {
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
    // 判断当前线程的锁是否是当前线程
    "
    if (redis.call('
    hexists'
    , KEYS[1], ARGV[2]) == 1) then
    // 更新有效期
    redis.call('
    pexpire'
    , KEYS[1], ARGV[1]);

    return 1;

    end;

    return 0;
    "
    ,
    Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));

    } 3、调用lua脚本<
    T>
    RFuture<
    T>
    tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<
    T>
    command) {
    // 锁释放时间
    this.internalLockLeaseTime = unit.toMillis(leaseTime);

    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
    // 判断锁成功
    "
    if (redis.call('
    exists'
    , KEYS[1]) == 0) then
    redis.call('
    hincrby'
    , KEYS[1], ARGV[2], 1);
    // 如果不存在,记录锁标识,次数+1
    redis.call('
    pexpire'
    , KEYS[1], ARGV[1]);
    // 设置锁有效期
    return nil;
    // 相当于Java的null
    end;

    if (redis.call('
    hexists'
    , KEYS[1], ARGV[2]) == 1) then
    redis.call('
    hincrby'
    , KEYS[1], ARGV[2], 1);
    // 如果存在,判断锁标识是否是自己的,次数+1
    redis.call('
    pexpire'
    , KEYS[1], ARGV[1]);
    // 设置锁有效期
    return nil;

    end;

    // 判断锁失败,pttl:指定锁剩余有效期,单位毫秒,KEYS[1]:锁的名称
    return redis.call('
    pttl'
    , KEYS[1]);
    "
    ,
    Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));

    } 六、释放锁unlock源码1、取消更新任务public RFuture<
    Void>
    unlockAsync(long threadId) {
    RPromise<
    Void>
    result = new RedissonPromise();

    RFuture<
    Boolean>
    future = this.unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) ->
    {
    // 取消更新任务
    this.cancelExpirationRenewal(threadId);

    if (e != null) {
    result.tryFailure(e);

    } else if (opStatus == null) {
    IllegalMonitorStateException cause = new IllegalMonitorStateException("
    attempt to unlock lock, not locked by current thread by node id: "
    + this.id + "
    thread-id: "
    + threadId);

    result.tryFailure(cause);

    } else {
    result.trySuccess((Object)null);

    }
    });

    return result;

    } 2、删除定时任务void cancelExpirationRenewal(Long threadId) {
    // 从map中取出当前锁的定时任务entry
    RedissonLock.ExpirationEntry task = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());

    if (task != null) {
    if (threadId != null) {
    task.removeThreadId(threadId);

    }
    // 删除定时任务
    if (threadId == null || task.hasNoThreads()) {
    Timeout timeout = task.getTimeout();

    if (timeout != null) {
    timeout.cancel();

    }

    EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());

    }
    }
    }

    分布式系统下的并发控制一直是一个难点,特别是在高并发下,锁竞争会严重影响系统性能和稳定性。Redis分布式锁是一种基于Redis实现的分布式锁,下面介绍Redis分布式锁的实现方法。
    1. 建立连接
    使用Redis分布式锁的第一步是建立Redis连接,通过连接池管理已连接的Redis实例,避免了每次请求都需要重新建立连接的开销,并且提高了Redis的使用效率。
    2. 加锁与解锁
    通过Redis的setnx命令实现分布式锁,setnx会尝试将数据写入Redis服务器,如果写入成功表示获得了锁,否则就表示已经有其他客户端获得了锁。当获得锁后,可以设置过期时间,避免实例意外宕机或者程序异常退出等情况下锁没有正常释放的问题。解锁时可以使用Redis的del命令删除锁对应的key。
    3. 重入锁和可重入锁
    重入锁可以让同一个客户端在锁已经被自己持有的情况下继续获取到锁,避免了死锁的问题。可重入锁则是指同一个线程可以多次获取锁,同样能够避免死锁问题,并且能够更方便地编写业务逻辑。
    4. 优化锁机制
    基于Redis实现的分布式锁有许多优化手段,例如使用Lua脚本保证加锁操作原子性、使用Redlock算法提高锁粒度和可靠性等方法,在极高并发下仍然能够保证性能和稳定性。
    综上所述,Redis分布式锁是非常实用的并发控制工具,通过对建立连接、加锁与解锁、重入锁和可重入锁、优化锁机制等方面进行深入学习,编写高性能、高可用性分布式系统将更加容易。