Redis实战(4)——Redisson分布式锁

1 基于互斥命令实现分布式锁的弊端

根据上篇文章基于redis互斥命令实现的分布式锁任然存在一定的弊端

  • 1无法重入: 同一个线程无法重新获得同一把锁
  • 2超时删除 :会因为超时、任务阻塞而自动释放锁,出现其他线程抢占锁出现并行导致线程不安全的问题
  • 3 不可重试: 基于setnx 互斥指令实现的非阻塞式分布式锁在获取不到锁时将会立即返回,没有重试机制
  • 4主从一致性: 如果Redis提供了主从同步,主从同步时出现了延迟时,会出现无法判定当前线程锁的状态,出现线程不安全的问题。因为一般写指令【setnx】向主Redis操作,读指令【get key】向从Redis操作,即主从分离的情形。

2 Redisson

为了解决上述基于互斥命令实现的锁出现的问题,可以选择使用Redisson分布式锁方案。
Redisson不仅仅是一个Redis客户端,它还实现了很多具有分布式特性的常用工具类。
引入依赖

        <!--redisson 分布式锁-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.5.6</version>
        </dependency>

创建Redisson的客户端

@Configuration
public class RedissonClientConfig {
    
    @Bean
    public RedissonClient redissonClient(){
        // 配置类
        Config config = new Config();
        // 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
        config.useSingleServer()
                .setAddress("redis://127.0.0.1:6379")
                .setDatabase(1);
        // 创建客户端
        return Redisson.create(config);
    }
}

3 Redisson 分布式锁实现原理

3.1 Redisson实现可重入锁原理

相较于redis使用互斥指令setnex 创建的分布式锁,存储的数据结构为 S t r i n g \textcolor{red}{String} String,Redisson存储的数据结构为 H a s h \textcolor{red}{Hash} Hash,线程在尝试获得锁的时候,除了存储当前线程标识之外,还会存储锁的重入次数。
同一个线程获得锁时,重入次数加一。释放锁时,重入次数减一,直至减至0时redis数据库删除该key的信息。为了基于原子性操作,Redisson获得锁和释放锁的逻辑都是基于Lua脚本实现的

在这里插入图片描述
执行获得锁的底层代码执行 L u a 脚本 \textcolor{red}{执行获得锁的底层代码执行Lua脚本} 执行获得锁的底层代码执行Lua脚本

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  //判定是否存在锁
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      //不存在,初次设置锁标识,重入次数为1
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      //设置锁的过期时间
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //判定是否为当前线程id 持有锁
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      // 是,则锁的重入次数加一
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      //更新锁持有的有效时长
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //未获得锁,返回当前锁的剩余的有效期时间 【单位ms】
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

KEYS[1] 锁的key
ARGV[1] 有效时长
ARGV[2] 锁的线程标识

释放锁的代码执行 L u a 脚本 \textcolor{red}{释放锁的代码执行Lua脚本} 释放锁的代码执行Lua脚本

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                 //锁不存在时,发布锁释放的消息
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                //不是当前线程标识,无法释放锁
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                //判定为当前线程标识持有锁,锁重入次数减一
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    //重入次数未减至0 时,更新锁的持有时间
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    //重入次数减至0时,redis删除锁,锁成功释放
                    "redis.call('del', KEYS[1]); " +
                    //发布锁成功释放的消息
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }

3.2 锁重试与锁的超时释放问题

在内部Redisson获得锁时,会执行以下方法。当成功获得锁时,返回null,获得锁失败时,获得锁剩余的持有时间。利用redis的订阅和信号量机制,在设定的等待时间内尝试重试获得锁。做到了锁重试,且不是无休止的盲目等待去获得锁的信息。
至于锁的超时释放问题,redisson 提供了watchdog机制,当不设定锁的超时时间,即默认设置为-1 时,利用watchdog机制,每隔一段时间 (internalLockLeaseTime 3),重置锁的有效时长

在这里插入图片描述

锁在最大等待时间内进行锁重试 \textcolor{red}{锁在最大等待时间内进行锁重试} 锁在最大等待时间内进行锁重试

    //超时释放时间,时间单位,线程标识id
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        //超时时间转化为ms,存入内部成员变量中
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  //锁不存在的时候,设置锁
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //获得锁失败,获得锁的剩余持有时间
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        //设定的锁等待时间,转为ms 单位
        long time = unit.toMillis(waitTime);
        //当前时间
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        //获得锁的剩余有效时间。根据上述方法值返回
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // ttl=null,标识锁获得成功,返回true
        if (ttl == null) {
            return true;
        }
        
        //更新锁等待时长,减去初次获得锁的时间
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            //等待时长<0,即已超出设定的超时等待时长。获得锁失败
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        //订阅锁释放的信息
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        //在剩余等待时长中等待看是否有其它线程释放锁的信息,没有收到释放锁的信息
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            //超时,取消锁释放的订阅
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            //判定没有收到锁释方的消息,获得锁失败
            acquireFailed(threadId);
            return false;
        }

        try {
            //再次更新锁的等待时间
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                //<0,即超时,获得锁失败
                acquireFailed(threadId);
                return false;
            }
        
            //进入重新获得锁逻辑
            while (true) {
                long currentTime = System.currentTimeMillis();
                //重新获得锁的剩余时间
                ttl = tryAcquire(leaseTime, unit, threadId);
                //null,成功获得锁
                if (ttl == null) {
                    return true;
                }

                //没有获得锁成功,更新锁的等待时间
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                     //等待时间<0,即超时,获得锁失败
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

锁超时—— w a t c h d o g 机制 \textcolor{red}{锁超时——watchdog机制} 锁超时——watchdog机制

    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
        //当设定超时时间为-1时
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
                Boolean ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining) {
                    //更新锁的有效时长 。
                    //scheduleExpirationRenewal是一个定时任务。任务的间隔时间是 internalLockLeaseTime / 3 。internalLockLeaseTime 设定的是30s,即锁的watchdog时间。直到用户显示的执行unlock()方法,取消该定时任务,锁成功释放。watchdog机制保证了锁的超时释放问题。
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

3.3 Redisson主从一致性解决方案

在redis 集群模式下,由于主从节点 读写分离 \textcolor{red}{读写分离} 读写分离,出现的因为主从同步出现延迟或主从同步数据失败会导致多个线程获得锁出现线程不安全问题。在Redisson中可以采用RedissonMultiLock【联锁,把多个锁联合成一把锁来看待】来解决。
即每一个redis节点都当成Master节点来看待,在获得锁时,必须每一个Redis节点都获得锁成功才算成功,释放锁时需要每一个Redis节点都释放锁成功才算成功。

M u l t i L o c k 的使用 \textcolor{red}{MultiLock的使用} MultiLock的使用

// 初始化三个锁 ,指向不同的redis节点
RLock lock1 = redissonClient.getLock("lockName1");
RLock lock2 = redissonClient.getLock("lockName2");
RLock lock3 = redissonClient.getLock("lockName3");

// 初始化三个锁的合并锁
RLock multiLock = redissonClient.getMultiLock(lock1, lock2, lock3);

// 获取锁
try{
multiLock.lock();
//do something
}finally{
 multiLock.unlock();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/55807.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

unity行为决策树实战详解

一、行为决策树的概念 行为决策树是一种用于游戏AI的决策模型&#xff0c;它将游戏AI的行为分解为一系列的决策节点&#xff0c;并通过节点之间的连接关系来描述游戏AI的行为逻辑。在行为决策树中&#xff0c;每个节点都代表一个行为或决策&#xff0c;例如移动、攻击、逃跑等…

LInux的安装(VMware,网卡设置,SSH连接工具)

Linux的安装 1、安装方式介绍 1.安装方式: 物理机安装:直接将操作系统安装到服务器硬件上 虚拟机安装:通过虚拟机软件安装 **虚拟机( Virtual Machine&#xff09;**指通过软件模拟的具有完整硬件系统功能、运行在完全隔离环境中的完整计算机系统。 2、安装Linux 在官网将…

20款奔驰S350升级原厂HUD抬头显示系统,提升您的行车安全

HUD是平视显示器的简称&#xff0c;它原先是运用在航空器上的飞行辅助仪器。指飞行员不需要低头&#xff0c;就能够看到他需要的重要资讯。由于HUD的方便性以及能够提高飞行安全&#xff0c;这项技术后来也发展到汽车行业。汽车搭载的HUD抬头数字显示功能&#xff0c;是利用光学…

Toolformer :让AI学会使用工具

paper: 《Toolformer: Language Models Can Teach Themselves to Use Tools 》 核心思想&#xff0c; 1. Sampling API Calls &#xff1a;设计设计prompt,让模型生成含API调用的文本&#xff08;如图3&#xff09;&#xff0c;只保留K个概率最高的API调用 2. Executing API …

MySql006——基本的SELECT查询语句

在《MySql003——结构化查询语言SQL基础知识》中&#xff0c;我们学习了有关SQL的基础知识&#xff0c;也知道SQL中查询语句SELECT使用最为频繁 接下来我们将学习一些基本的SELECT查询语句 一、SELECT语句的通用语法 在MySQL数据库中&#xff0c;使用SELECT语句可以查询数据…

Android Studio多渠道打包

使用环境&#xff1a; Android studio 多渠道打包 使用方法&#xff1a; 1 APP下build.gradle文件 flavorDimensions "default"productFlavors {huawei {dimension "default"manifestPlaceholders [ channel:"huawei" ]}xiaomi {dimension &…

Nginx学习教程(基础篇)

目录 一、Nginx安装 二、Nginx基本使用 2.1、目录结构 conf html logs sbin 2.2、基本运行原理 2.3、nginx.conf最小配置解析 worker_processes worker_connections include mime.types default_type application/octet-stream sendfile on keepalive_timeout…

Python实现GA遗传算法优化循环神经网络分类模型(LSTM分类算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 遗传算法&#xff08;Genetic Algorithm&#xff0c;GA&#xff09;最早是由美国的 John holland于20世…

lc1074.元素和为目标值的子矩阵数量

创建二维前缀和数组 两个for循环&#xff0c;外循环表示子矩阵的左上角&#xff08;x1,y1&#xff09;&#xff0c;内循环表示子矩阵的右下角&#xff08;x2,y2&#xff09; 两个for循环遍历&#xff0c;计算子矩阵的元素总和 四个变量&#xff0c;暴力破解的时间复杂度为O(…

驱动开发(中断)

头文件&#xff1a; #ifndef __LED_H__ #define __LED_H__#define PHY_LED1_MODER 0X50006000 #define PHY_LED1_ODR 0X50006014 #define PHY_LED1_RCC 0X50000A28#define PHY_LED2_MODER 0X50007000 #define PHY_LED2_ODR 0X50007014 #define PHY_LED2_RCC 0X50000A28#def…

【安装Tomcat,web站点部署】

Tomcat部署 第一种安装 yum install tomcat.noarch -y yum install tomcat-webapps.noarch -y #启动脚本 [rootlocalhost ~]# systemctl start tomcat这时可以通过IP地址端口访问 第二种安装 点击此处找到apache-tomcat-8.5.70.tar.gz 官网下载JDK #先安装JDK [rootlocalh…

小程序云开发快速入门(1/4)

前言 从上次完成了码仔备忘录本地版本后&#xff0c;码仔就养成了每天记录备忘录的好习惯&#xff0c;每周早上会记录下自己要做的任务&#xff0c;然后晚上在复盘一下今天的计划是否完成。 有一天&#xff0c;码仔看到它最喜欢的码妞在一旁愁眉苦脸。 码仔&#xff1a;“怎么…

思腾云计算

以AI赋能&#xff0c;致敬不凡 原创 Sitonholy 思腾合力 2023-04-28 07:00 发表于北京 收录于合集#品牌介绍156个 致敬不凡 以AI赋能 思 腾 合 力 人工智能的发展和应用与五一劳动节的意义和价值是相通的。人工智能的快速发展将会对劳动力市场和生产方式产生深远的影响&…

【Ansible】Ansible自动化运维工具之playbook剧本

playbook 一、playbook 的概述1. playbook 的概念2. playbook 的构成 二、playbook 的应用1. 安装 httpd 并启动2. 定义、引用变量3. 指定远程主机 sudo 切换用户4. when条件判断5. 迭代6. Templates 模块6.1 添加模板文件6.2 修改主机清单文件6.3 编写 playbook 7. tags 模块 …

谈一谈缓存穿透,击穿,雪崩

缓存穿透 缓存穿透是指在使用缓存系统时&#xff0c;频繁查询一个不存在于缓存中的数据&#xff0c;导致这个查询每次都要通过缓存层去查询数据源&#xff0c;无法从缓存中获得结果。这种情况下&#xff0c;大量的请求会直接穿透缓存层&#xff0c;直接访问数据源&#xff0c;…

GLM模型介绍

paper: 《GLM: General Language Model Pretraining with Autoregressive Blank Infilling》 摘要&#xff1a; 我们提出了一个基于自回归空白填充的通用语言模型&#xff08;GLM&#xff09;来解决这一挑战。GLM通过添加2D位置编码和允许任意顺序预测跨度来改进空白填充预训…

视频媒体有哪些?视频媒体采访服务怎么做?

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 一&#xff0c;在国内&#xff0c;主流的视频媒体包括&#xff1a; 1. 电视台&#xff1a;包括国家级、地方性和专业性电视频道&#xff0c;涵盖各类新闻、综艺、娱乐、体育等节目。 2…

Linux下 Docker容器引擎基础(2)

目录 创建私有仓库 将修改过的nginx镜像做标记封装&#xff0c;准备上传到私有仓库 将镜像上传到私有仓库 从私有仓库中下载镜像到本地 CPU使用率 CPU共享比例 CPU周期限制 CPU 配额控制参数的混合案例 内存限制 Block IO 的限制 限制bps 和iops 创建私有仓库 仓库&a…

数据结构--基础知识

数据结构是什么&#xff1f; 数据结构是计算机科学中研究数据组织、存储和管理的方法和原则。它涉及存储和操作数据的方式&#xff0c;以便能够高效地使用和访问数据。 相关内容 基本组成 数组&#xff08;Array&#xff09;&#xff1a;数组是一种线性数据结构&#xff0c;…

为什么需要智能工业自动化网络?如何搭建?

在当今快节奏的社会中&#xff0c;工业自动化变得越来越重要。传统的手动操作和生产方式已经不能满足现代工业的需求。因此&#xff0c;建设工业自动化已成为一个必然趋势。通过不断进步的新技术创建更高效、更可靠、更安全的智能工业自动化网络。在本文中&#xff0c;我们将讨…