# Redisson分布式锁

# 概述

分布式锁是为了解决分布式场景下全局加锁的问题。 在单体项目中可以使用synchronize完成对于不同线程之间的资源争抢问题。 但是在分布式场景下,synchronize只能对其中一个项目进行资源控制,进程之间的资源增强仍然无法解距。 换言之,可以将分布式锁理解为对于整个分布式系统的synchronize。

分布式锁的使用场景如下,在分布式环境下:

img_1.png

上图可以看到,变量A存在三个服务器内存中(这个变量A主要体现是在一个类中的一个成员变量,是一个有状态的对象), 如果不加任何控制的话,变量A同时都会在分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的! 即使不是同时发过来,三个请求分别操作三个不同内存区域的数据,变量A之间不存在共享,也不具有可见性, 处理的结果也是不对的!

如果我们业务中确实存在这个场景的话,我们就需要使用分布式锁解决这个问题!

本章节主要介绍如何使用HOS-Redisson分布式锁, Redisson分布式锁具有互斥性、可重入性、锁超时、高效性、高可用、阻塞性、公平性特点, 防止分布式系统中的多个进程之间相互干扰

# 配置

# 1、导入maven依赖

<dependency>
    <groupId>com.mediway.hos</groupId>
    <artifactId>hos-framework-redis-starter</artifactId>
</dependency>

# 2、配置项

Redis支持单节点、集群、哨兵三种模式,具体配置请根据实际情况进行选择

spring:
  redis:
    #redis配置项 
    database: 0
    timeout: 5000
    port: 6379
    host: localhost
    #redisson配置项
    redisson:
      ## redisson 初始化配置
      ## redisson支持单节点、集群、哨兵三种模式
      config: |
        ## 公共配置
        threads: 16
        nettyThreads: 32
        codec: !<org.redisson.client.codec.StringCodec> { }
        transportMode: "NIO"
        lockWatchdogTimeout: 50000
        ## 单节点模式
        singleServerConfig:
          idleConnectionTimeout: 10000
          connectTimeout: 10000
          timeout: 10000
          retryAttempts: 3
          retryInterval: 1500
          # 在redis配置中已经配置使用的密码,该处可以不另外指定
          # password: null
          subscriptionsPerConnection: 5
          clientName: null
          # 在redis配置中已经配置地址与端口,该处可以不另外指定
          # address: "redis://localhost:6379"
          subscriptionConnectionMinimumIdleSize: 1
          subscriptionConnectionPoolSize: 50
          connectionMinimumIdleSize: 24
          connectionPoolSize: 64
          # 在redis配置中已经配置使用的database,该处可以不另外指定
          # database: 0
          dnsMonitoringInterval: 5000
        ## 集群模式
        clusterServersConfig:
          idleConnectionTimeout: 10000
          connectTimeout: 10000
          timeout: 3000
          retryAttempts: 3
          retryInterval: 1500
          failedSlaveReconnectionInterval: 3000
          failedSlaveCheckInterval: 60000
          # 在redis配置中已经配置使用的密码,该处可以不另外指定
          # password: null
          subscriptionsPerConnection: 5
          clientName: null
          loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
          subscriptionConnectionMinimumIdleSize: 1
          subscriptionConnectionPoolSize: 50
          slaveConnectionMinimumIdleSize: 24
          slaveConnectionPoolSize: 64
          masterConnectionMinimumIdleSize: 24
          masterConnectionPoolSize: 64
          readMode: "SLAVE"
          subscriptionMode: "SLAVE"
          # 在redis配置中已经配置地址与端口,该处可以不另外指定
          # nodeAddresses:
          #  - "redis://127.0.0.1:7004"
          #  - "redis://127.0.0.1:7001"
          #  - "redis://127.0.0.1:7000"
          scanInterval: 1000
          pingConnectionInterval: 30000
          keepAlive: false
          tcpNoDelay: false
        ## 哨兵模式
        sentinelServersConfig:
          idleConnectionTimeout: 10000
          connectTimeout: 10000
          timeout: 3000
          retryAttempts: 3
          retryInterval: 1500
          # 在redis配置中已经配置使用的密码,该处可以不另外指定
          # password: null
          subscriptionsPerConnection: 5
          clientName: null
          loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
          slaveSubscriptionConnectionMinimumIdleSize: 1
          slaveSubscriptionConnectionPoolSize: 50
          slaveConnectionMinimumIdleSize: 32
          slaveConnectionPoolSize: 64
          masterConnectionMinimumIdleSize: 32
          masterConnectionPoolSize: 64
          readMode: "SLAVE"
          # 在redis配置中已经配置地址与端口,该处可以不另外指定
          # sentinelAddresses:
          #  - "redis://127.0.0.1:26379"
          #  - "redis://127.0.0.1:26389"
          masterName: "mymaster"
          # 在redis配置中已经配置使用的database,该处可以不另外指定
          #database: 0
        
        

注意

redisson配置中使用的模式需要与redis中保持一致。正常使用过程中只配置对应的模式,其他的请直接删除。

若其他模式的配置项未删除,会按照优先度 单节点-哨兵-集群 初始化对应模式。 如果有单节点模式的配置项,采用单节点模式,没有单节点配置下, 查看是否有哨兵模式配置项,如果有则采用哨兵模式, 以此类推

redisson中对应模式的配置项,地址、密码、使用的database不需要单独指定,会采用和redis中相同的配置

# redisson 配置项功能说明(对应 spring.redis.redisson.config中的配置)

下面依次介绍: 公共配置, 集群模式配置, 单Redis节点模式配置, 哨兵模式配置,

# 公共配置
  • lockWatchdogTimeout

    监控锁的看门狗超时,单位:毫秒,默认值:`30000`。

    获取锁时,如果设置了锁的过期时间(leaseTimeout),此时不会触发看门狗。 若不设置过期时间,会触发看门狗,在当前线程获取到锁后,看门狗会不断延续锁的时间。 从而保证当前线程在正常处理逻辑中,锁不会被其他线程占用。若当前线程由于未知原因宕机,看门狗也会停止, 保证该锁不会被一直占用,影响其他线程的对于锁的使用。

  • codec

    编码,默认值: org.redisson.codec.JsonJacksonCodec

    该项是配置Redisson的对象编码类,是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储, 不会影响到程序的正常使用

    Redisson提供了以下几种的对象编码应用,以供大家选择:

    编码类名称 说明
    org.redisson.codec.JsonJacksonCodec Jackson JSON 编码 默认编码
    org.redisson.codec.AvroJacksonCodec Avro 一个二进制的JSON编码
    org.redisson.codec.SmileJacksonCodec Smile 另一个二进制的JSON编码
    org.redisson.codec.CborJacksonCodec CBOR 又一个二进制的JSON编码
    org.redisson.codec.MsgPackJacksonCodec MsgPack 再来一个二进制的JSON编码
    org.redisson.codec.IonJacksonCodec Amazon Ion 亚马逊的Ion编码,格式与JSON类似
    org.redisson.codec.KryoCodec Kryo 二进制对象序列化编码
    org.redisson.codec.SerializationCodec JDK序列化编码
    org.redisson.codec.FstCodec FST 10倍于JDK序列化性能而且100%兼容的编码
    org.redisson.codec.LZ4Codec LZ4 压缩型序列化对象编码
    org.redisson.codec.SnappyCodec Snappy 另一个压缩型序列化对象编码
    org.redisson.client.codec.JsonJacksonMapCodec 基于Jackson的映射类使用的编码。可用于避免序列化类的信息,以及用于解决使用byte[]遇到的问题。
    org.redisson.client.codec.StringCodec 纯字符串编码(无转换)
    org.redisson.client.codec.LongCodec 纯整长型数字编码(无转换)
    org.redisson.client.codec.ByteArrayCodec 字节数组编码
    org.redisson.codec.CompositeCodec 用来组合多种不同编码在一起
  • threads

    线程池数量, 默认值: 当前处理核数量 * 2 。 这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。

  • nettyThreads

    Netty线程池数量, 默认值: 当前处理核数量 * 2。 这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务, 以及底层客户端所一同共享的线程池里保存的线程数量。

  • transportMode 传输模式 ,默认值:TransportMode.NIO。 可选参数: TransportMode.NIO, TransportMode.EPOLL - 需要依赖里有netty-transport-native-epoll包(Linux) TransportMode.KQUEUE - 需要依赖里有 netty-transport-native-kqueue包(macOS)

# 集群模式

集群模式除了适用于Redis集群环境,也适用于任何云计算服务商提供的集群模式,例如AWS ElastiCache集群版、Azure Redis Cache和阿里云(Aliyun)的云数据库Redis版。

配置项为 clusterServersConfig,其参数如下:

  • nodeAddresses

    节点地址,可以通过host:port的格式来添加Redis集群节点的地址。多个节点可以一次性批量添加。

  • scanInterval

    集群扫描间隔时间 ,默认值: 1000

  • slots

    分片数量 ,默认值: 231 用于指定数据分片过程中的分片数量

  • readMode

    读取操作的负载均衡模式 ,默认值: SLAVE(只在从服务节点里读取),设置读取操作选择节点的模式。 可用值为: SLAVE - 只在从服务节点里读取。 MASTER - 只在主服务节点里读取。 MASTER_SLAVE - 在主从服务节点里都可以读取。

  • subscriptionMode

    订阅操作的负载均衡模式 ,默认值:SLAVE(只在从服务节点里订阅)。设置订阅操作选择节点的模式。 可用值为: SLAVE - 只在从服务节点里订阅。 MASTER - 只在主服务节点里订阅。

  • loadBalancer

    负载均衡算法类的选择 ,默认值: org.redisson.connection.balancer.RoundRobinLoadBalancer。 在多Redis服务节点的环境里,可以选用以下几种负载均衡方式选择一个节点: org.redisson.connection.balancer.WeightedRoundRobinBalancer - 权重轮询调度算法 org.redisson.connection.balancer.RoundRobinLoadBalancer - 轮询调度算法 org.redisson.connection.balancer.RandomLoadBalancer - 随机调度算法

  • subscriptionConnectionMinimumIdleSize

    从节点发布和订阅连接的最小空闲连接数 ,默认值:1 。 多从节点的环境里,每个从服务节点里用于发布和订阅连接的最小保持连接数(长连接)。 Redisson内部经常通过发布和订阅来实现许多功能。长期保持一定数量的发布订阅连接是必须的。

  • subscriptionConnectionPoolSize

    从节点发布和订阅连接池大小 ,默认值:50。 多从节点的环境里,每个从服务节点里用于发布和订阅连接的连接池最大容量。连接池的连接数量自动弹性伸缩。

  • slaveConnectionMinimumIdleSize

    从节点最小空闲连接数 ,默认值:32。 多从节点的环境里,每个从服务节点里用于普通操作(非 发布和订阅)的最小保持连接数(长连接)。 长期保持一定数量的连接有利于提高瞬时读取反映速度。

  • slaveConnectionPoolSize

    从节点连接池大小 ,默认值:64。 多从节点的环境里,每个从服务节点里用于普通操作(非 发布和订阅)连接的连接池最大容量。 连接池的连接数量自动弹性伸缩。

  • masterConnectionMinimumIdleSize

    主节点最小空闲连接数,默认值:32。 多节点的环境里,每个主节点的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时写入反应速度。

  • masterConnectionPoolSize

    主节点连接池大小 ,默认值:64。 多主节点的环境里,每个主节点的连接池最大容量。连接池的连接数量自动弹性伸缩。

  • idleConnectionTimeout

    连接空闲超时,单位:毫秒 ,默认值:10000。 如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值, 那么这些连接将会自动被关闭,并从连接池里去掉。

  • connectTimeout

    连接超时,单位:毫秒 ,默认值:10000。 同任何节点建立连接时的等待超时。

  • timeout

    命令等待超时,单位:毫秒 ,默认值:3000。 等待节点回复命令的时间。该时间从命令发送成功时开始计时。

  • retryAttempts

    命令失败重试次数 ,默认值:3。 如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。

  • retryInterval

    命令重试发送时间间隔,单位:毫秒 ,默认值:1500。 在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时, 该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。

  • password

    密码 ,默认值:null。 用于节点身份验证的密码。

  • subscriptionsPerConnection

    单个连接最大订阅数量 ,默认值:5。每个连接的最大订阅数量。

  • clientName

    客户端名称 ,默认值:null。在Redis节点里显示的客户端名称。

  • sslEnableEndpointIdentification

    启用SSL终端识别 ,默认值:true。开启SSL终端识别能力。

  • sslProvider

    SSL实现方式 ,默认值:JDK。确定采用哪种方式(JDK或OPENSSL)来实现SSL连接。

  • sslTruststore

    SSL信任证书库路径 ,默认值:null。指定SSL信任证书库的路径。

  • sslTruststorePassword

    SSL信任证书库密码 ,默认值:null。指定SSL信任证书库的密码。

  • sslKeystore

    SSL钥匙库路径 ,默认值:null。指定SSL钥匙库的路径。

  • sslKeystorePassword

    SSL钥匙库密码 ,默认值:null。指定SSL钥匙库的密码。

# 单Redis节点模式

配置项为 singleServerConfig 参数如下:

  • address

    节点地址。 可以通过host:port的格式来指定节点地址。

  • subscriptionConnectionMinimumIdleSize

    发布和订阅连接的最小空闲连接数,默认值:1。 用于发布和订阅连接的最小保持连接数(长连接)。 Redisson内部经常通过发布和订阅来实现许多功能。长期保持一定数量的发布订阅连接是必须的。

  • subscriptionConnectionPoolSize

    发布和订阅连接池大小,默认值:50 。用于发布和订阅连接的连接池最大容量。连接池的连接数量自动弹性伸缩。

  • connectionMinimumIdleSize

    最小空闲连接数(长连接),默认值:32。 长期保持一定数量的连接有利于提高瞬时写入反应速度。

  • connectionPoolSize

    连接池大小,默认值:64。 在启用该功能以后,Redisson将会监测DNS的变化情况。

  • dnsMonitoringInterval

    DNS监测时间间隔,单位:毫秒,默认值:5000。 监测DNS的变化情况的时间间隔。

  • idleConnectionTimeout 连接空闲超时,单位:毫秒, 默认值:10000。 如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值, 那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。

  • connectTimeout

    连接超时,单位:毫秒, 默认值:10000。 同节点建立连接时的等待超时。

  • timeout

    命令等待超时,单位:毫秒,默认值:3000。 等待节点回复命令的时间。该时间从命令发送成功时开始计时。

  • retryAttempts

    命令失败重试次数,默认值:3。 如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时, 将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。

  • retryInterval

    命令重试发送时间间隔,单位:毫秒,默认值:1500。 在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时, 该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。

  • database

    数据库编号,默认值:0。尝试连接的数据库编号。

  • password

    密码,默认值:null。用于节点身份验证的密码。

  • subscriptionsPerConnection

    单个连接最大订阅数量,默认值:5。 每个连接的最大订阅数量。

  • clientName

    客户端名称,默认值:null。在Redis节点里显示的客户端名称。

  • sslEnableEndpointIdentification

    启用SSL终端识别,默认值:true。 开启SSL终端识别能力。

  • sslProvider

    SSL实现方式,默认值:JDK。确定采用哪种方式(JDKOPENSSL)来实现SSL连接。

  • sslTruststore

    SSL信任证书库路径,默认值:null。指定SSL信任证书库的路径。

  • sslTruststorePassword

    SSL信任证书库密码,默认值:null。指定SSL信任证书库的密码。

  • sslKeystore

    SSL钥匙库路径,默认值:null。指定SSL钥匙库的路径。

  • sslKeystorePassword

    SSL钥匙库密码,默认值:null。指定SSL钥匙库的密码。

# 哨兵模式

配置项为 sentinelServersConfig 参数如下:

  • dnsMonitoringInterval

    DNS监控间隔,单位:毫秒,默认值:5000。 用来指定检查节点DNS变化的时间间隔。 使用的时候应该确保JVM里的DNS数据的缓存时间保持在足够低的范围才有意义。用-1来禁用该功能。

  • masterName

    主服务器的名称,主服务器的名称是哨兵进程中用来监测主从服务切换情况的。

  • addSentinelAddress

    添加哨兵节点地址,可以通过host:port的格式来指定哨兵节点的地址。多个节点可以一次性批量添加。

  • readMode

    读取操作的负载均衡模式,默认值: SLAVE(只在从服务节点里读取) 。 设置读取操作选择节点的模式。 可用值为: SLAVE - 只在从服务节点里读取。 MASTER - 只在主服务节点里读取。 MASTER_SLAVE - 在主从服务节点里都可以读取。

  • subscriptionMode

    订阅操作的负载均衡模式,默认值:SLAVE(只在从服务节点里订阅)。 设置订阅操作选择节点的模式。 可用值为: SLAVE - 只在从服务节点里订阅。 MASTER - 只在主服务节点里订阅。

  • loadBalancer

    负载均衡算法类的选择,默认值: org.redisson.connection.balancer.RoundRobinLoadBalancer。 在使用多个Redis服务节点的环境里, 可以选用以下几种负载均衡方式选择一个节点: org.redisson.connection.balancer.WeightedRoundRobinBalancer - 权重轮询调度算法 org.redisson.connection.balancer.RoundRobinLoadBalancer - 轮询调度算法 org.redisson.connection.balancer.RandomLoadBalancer - 随机调度算法

  • subscriptionConnectionMinimumIdleSize

    从节点发布和订阅连接的最小空闲连接数,默认值:1。 多从节点的环境里,每个从服务节点里用于发布和订阅连接的最小保持连接数(长连接)。 Redisson内部经常通过发布和订阅来实现许多功能。长期保持一定数量的发布订阅连接是必须的。

  • subscriptionConnectionPoolSize

    从节点发布和订阅连接池大小,默认值:50。 多从节点的环境里,每个从服务节点里用于发布和订阅连接的连接池最大容量。 连接池的连接数量自动弹性伸缩。

  • slaveConnectionMinimumIdleSize

    从节点最小空闲连接数,默认值:32。 多从节点的环境里,每个从服务节点里用于普通操作(非 发布和订阅)的最小保持连接数(长连接)。 长期保持一定数量的连接有利于提高瞬时读取反映速度。

  • slaveConnectionPoolSize

    从节点连接池大小,默认值:64。 多从节点的环境里,每个从服务节点里用于普通操作(非 发布和订阅)连接的连接池最大容量。 连接池的连接数量自动弹性伸缩。

  • masterConnectionMinimumIdleSize

    主节点最小空闲连接数,默认值:32。 多从节点的环境里,每个主节点的最小保持连接数(长连接)。 长期保持一定数量的连接有利于提高瞬时写入反应速度。

  • masterConnectionPoolSize

    主节点连接池大小,默认值:64。主节点的连接池最大容量。连接池的连接数量自动弹性伸缩。

  • idleConnectionTimeout

    连接空闲超时,单位:毫秒,默认值:10000。 如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值, 那么这些连接将会自动被关闭,并从连接池里去掉。

  • connectTimeout

    连接超时,单位:毫秒,默认值:10000。同任何节点建立连接时的等待超时。

  • timeout

    命令等待超时,单位:毫秒,默认值:3000。等待节点回复命令的时间。该时间从命令发送成功时开始计时。

  • retryAttempts

    命令失败重试次数,默认值:3。 如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时, 将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。

  • retryInterval

    命令重试发送时间间隔,单位:毫秒,默认值:1500。在某个节点执行相同或不同命令时, 连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除, 直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。

  • database

    数据库编号,默认值:0。尝试连接的数据库编号。

  • password

    密码,默认值:null。用于节点身份验证的密码。

  • subscriptionsPerConnection

    单个连接最大订阅数量,默认值:5。每个连接的最大订阅数量。

  • clientName

    客户端名称,默认值:null。在Redis节点里显示的客户端名称。

  • sslEnableEndpointIdentification

    启用SSL终端识别,默认值:true。开启SSL终端识别能力。

  • sslProvider

    SSL实现方式,默认值:JDK。确定采用哪种方式(JDK或OPENSSL)来实现SSL连接。

  • sslTruststore

    SSL信任证书库路径,默认值:null。指定SSL信任证书库的路径。

  • sslTruststorePassword

    SSL信任证书库密码,默认值:null。指定SSL信任证书库的密码。

  • sslKeystore

    SSL钥匙库路径,默认值:null。指定SSL钥匙库的路径。

  • sslKeystorePassword

    SSL钥匙库密码,默认值:null。指定SSL钥匙库的密码。

# 分布式锁以及介绍

Redisson分布式锁共支持 可重入锁公平锁联锁红锁读写锁闭锁信号量可过期信号量 八种类型的锁, 来解决不同场景下的使用需求.

# 可重入锁(Reentrant Lock)

描述: 该类型锁为基本类型的分布式锁(独占锁)。 多个线程请求该类型的同一个锁时,会进行抢锁, 抢占到锁的线程会继续进行,未获取到锁的线程会进入线程阻塞状态,并等待锁被释放, 在锁被释放后,继续进行抢锁

img.png

RLock lock = redisson.getLock("anyLock");
// 加锁,最常见的使用方法
lock.lock();

// 加锁,并设置锁有效时间
// 第一个参数为有效时间,第二个参数为时间单位
// 下方的例子的含义为,添加一个有效时间为10s的锁
lock.lock(10, TimeUnit.SECONDS);
...
        
//解锁
lock.unlock();

# 公平锁(Fair Lock)

描述: 当多个线程请求同一个锁时,第一个抢占到锁的线程继续进行,其他线程按照请求锁的先后顺序 进行排队。未获取到锁的线程会进入线程阻塞状态,并等待锁被释放。 锁被释放之后按照排队的先后顺序,由第二个线程获取到锁,其他线程继续等待直到获取到锁.

与可重入锁不同点在于,公平锁按照线程请求的先后顺序依次获取到锁,等待锁的线程不会饿死。

  • 应用:需要按照排队顺序获取公共资源,例如抢票,生活中例如ATM排队取钱
RLock fairLock = redisson.getFairLock("anyLock");
// 加锁,最常见的使用方法
fairLock.lock();

// 加锁,并设置锁有效时间
// 第一个参数为锁的有效时间,第二个参数为时间单位
// 下方的例子的含义为,添加一个有效时间为10s的锁
fairLock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,并设置锁过期时间
// 第一个参数为尝试时间,第二个参数为锁有效时间,第三个参数为时间单位
// 下面例子的含义为:在100秒时间内尝试加锁,若获取到锁,返回true,锁的有效时间为10s,
// 若100秒内未获取到锁,返回false
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
// 解锁
fairLock.unlock();

# 联锁(MultiLock)

描述: 联锁为分布式独占锁,能够多个Redis中进行加锁操作。 触发加锁时会对所有关联的锁尝试加锁,若所有的加锁成功,才可以加锁成功。否则,均不会加锁成功

应用: 一个线程需要同时占用多个公共资源的情况, 例如生活中,一次会议需要同时有会议室、会议相关人员,两项一项不满足便无法进行会议。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

// 联锁,加锁时会同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);

// 尝试加锁,并设置锁过期时间
// 第一个参数为尝试时间,第二个参数为锁有效时间,第三个参数为时间单位
// 下面例子的含义为:在100秒时间内尝试加锁,若获取到锁,返回true,锁的有效时间为10s,
// 若100秒内未获取到锁,返回false
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

# 红锁(RedLock)

描述: 红锁为分布式独占锁,能够多个Redis中的锁进行加锁操作。 触发加锁时会对全部的锁尝试加锁,均过半的锁加锁成功,则标识为成功。

应用:用于多台redis,解决主从场景下,主服务器宕机数据未及时同步引发的问题

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

// 红锁,加锁时同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);

// 尝试加锁,并设置锁过期时间
// 第一个参数为尝试时间,第二个参数为锁有效时间,第三个参数为时间单位
// 下面例子的含义为:在100秒时间内尝试加锁,若获取到锁,返回true,锁的有效时间为10s,
// 若100秒内未获取到锁,返回false
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

# 读写锁(ReadWriteLock)

描述:分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。 读锁为共享锁,写锁为独占锁,读锁与写锁之间为互斥关系

应用: 读写锁把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作. 该类型锁适合于对数据结构的读次数比写次数多得多的情况。

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
// 加读锁
rwlock.readLock().lock();
// 加写锁
rwlock.writeLock().lock();

// 加读锁,并设置锁过期时间
// 第一个参数为锁的有效时间,第二个参数为时间单位
// 下方的例子的含义为,添加一个有效时间为10s的锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);

// 加写锁,并设置锁过期时间
// 第一个参数为锁的有效时间,第二个参数为时间单位
// 下方的例子的含义为,添加一个有效时间为10s的锁
rwlock.writeLock().lock(10, TimeUnit.SECONDS);


// 加读锁,尝试加锁,并设置锁过期时间
// 第一个参数为尝试时间,第二个参数为锁有效时间,第二个参数为时间单位
// 下面例子的含义为:在100秒时间内尝试加锁,若获取到锁,返回true,锁的有效时间为10s,
// 若100秒内未获取到锁,返回false
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);

// 加写锁,尝试加锁,并设置锁过期时间
// 第一个参数为尝试时间,第二个参数为锁有效时间,第二个参数为时间单位
// 下面例子的含义为:在100秒时间内尝试加锁,若获取到锁,返回true,锁的有效时间为10s,
// 若100秒内未获取到锁,返回false
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
rwlock.readLock().unlock();
rwlock.writeLock().unlock();

# 闭锁( CountDownLatch)

描述:一般应用于分布式场景,例如分成主要线程,和副线程。

主要线程设置闭锁总量,并进入线程阻塞状态等待闭锁总量归0(即等待其他线程处理完成)。 副线程进行逻辑处理(主要线程的前置处理),处理完成之后将闭锁总量减1。

应用:适用于分布式环境下,在要完成某些运算时,只有其它线程的运算全部运行完毕,当前线程才继续下去。

//主要进程设置闭锁总量,并进入阻塞状态等待闭锁总量归0(即等待其他线程处理完成)
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

//进行主要进程前置处理的线程。处理完成之后将闭锁总量减1
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

# 信号量(Semaphore)

描述: 信号量中存在一个总量(总信号量),当前线程获取信号时, 若信号不足,会进入线程阻塞状态,等待信号被释放达到所需量,当前线程才可以继续运行

应用:可在分布式限流场景下使用,例如抢占车位等。

RSemaphore semaphore = redisson.getSemaphore("semaphore");
//设置总信号量
semaphore.trySetPermits(100);

//获取信号,只获取1个。
// 若信号量总信号归0,该线程会进入线程阻塞状态,等待其他线程释放信号,才可以继续运行
semaphore.acquire();
//获取信号,获取多个,方法参数为获取的数量。
// 若信号量数量不足,该线程会进入线程阻塞状态,等待其他线程释放信号,且信号达到所需的值,才可以继续运行
semaphore.acquire(23);

//释放信号,释放多个,方法参数为释放信号的数量。
semaphore.release(10);
//释放信号,释放1个信号
semaphore.release();

# 可过期信号量(PermitExpirableSemaphore)

描述: 每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。 除此之外锁对于线程的阻塞和继续运行状态的处理和信号量相同

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
// 获取一个信号
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);

# 使用示例

# 可重入锁

    //注入redissonClient对象
    @Autowired
    private RedissonClient redisson;

    /**
     * 锁使用示例--可重入锁
     * 多个线程抢占锁,未获取到锁进入阻塞状态
     *
     *
     * 功能描述:获取到当前的锁并加锁,等待5s后解除锁,以下其他方法功能类似
     *
     */
    @PostMapping("/normalLock")
    public BaseResponse<String> normalLockExample(@RequestBody String reqBody){
        //当前的线程排序值
        String threadIndex="0";
        threadIndex= JSONUtil.parseObj(reqBody).getStr("threadIndex");

        //设置可重入锁
        RLock normalLock = redisson.getLock("normalLock1");

        try {
            //可重入锁上锁。第一个参数为锁的有效时间,第二个参数为时间的单位,当前使用的是s(秒)
            normalLock.lock(10, TimeUnit.SECONDS);
            //尝试获取锁,如果获取成功返回true。第一个参数为尝试获取锁的时间(在该时间内获取成功则返回true),第二个参数为锁的有效时间,第三个参数为时间的单位,当前使用的是s(秒)
//            normalLock.tryLock(60, 10, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //是否被锁
        boolean result= normalLock.isLocked();

        if(result){
            System.out.println("线程"+threadIndex+"获取锁成功");
            try{
                Thread.sleep(5000);
            }catch (Exception e){

            }
            normalLock.unlock();
            System.out.println("线程"+threadIndex+"锁解锁");
        }else{
            System.out.println("线程"+threadIndex+"锁获取失败!!!");
        }
        return BaseResponse.success("LockTest"+threadIndex);
    }

同时执行10个请求,运行结果:

redisson_1.png

# 公平锁

/**
     * 锁使用示例--公平锁
     * 线程会进入排队,并按照顺序依次进行,未获取到锁进入阻塞状态
     *
     */
    @PostMapping("/fairLock")
    public BaseResponse<String> fairLockExample(@RequestBody String reqBody){
        //当前的线程排序值
        String threadIndex="0";
        threadIndex= JSONUtil.parseObj(reqBody).getStr("threadIndex");

        //设置公平锁。线程会进入排队,并按照顺序依次进行
        RLock fairLock = redisson.getFairLock("fairLock1");
        try {
            //公平锁上锁,第一个参数为锁的有效时间,第二个参数为时间的单位,当前使用的是s(秒)
            fairLock.lock(10, TimeUnit.SECONDS);
            //尝试获取锁,如果获取成功返回true。第一个参数为尝试获取锁的时间(在该时间内获取成功则返回true),第二个参数为锁的有效时间,第三个参数为时间的单位,当前使用的是s(秒)
//            fairLock.tryLock(60, 10, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }


        //是否被锁
        boolean result= fairLock.isLocked();

        if(result){
            System.out.println("线程"+threadIndex+"获取锁成功");
            try{
                Thread.sleep(5000);
            }catch (Exception e){

            }
            fairLock.unlock();
            System.out.println("线程"+threadIndex+"锁解锁");
        }else{
            System.out.println("线程"+threadIndex+"锁获取失败!!!");
        }
        return BaseResponse.success("LockTest"+threadIndex);
    }

同时执行10个请求,运行结果:

img.png

与可重入锁相比,公平锁在不同线程获取锁时按先后顺序会进入排队,最后按照队列中顺序逐一获取到锁

# 联锁


    //实例化redis
    public RedissonClient getRedisson1(){
        Config config = new Config();
        config.useSingleServer()
                .setAddress( "redis://127.0.0.1:6379" )
                .setConnectTimeout(5000)
                .setDatabase(0);
        config.setCodec(new StringCodec());
        return Redisson.create(config);
    }

    //实例化redis
    public RedissonClient getRedisson2(){
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://127.0.0.1:6380" )
                .setConnectTimeout(5000)
                .setDatabase(0);
        config.setCodec(new StringCodec());
        return Redisson.create(config);
    }

    //实例化redis
    public RedissonClient getRedisson3(){
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://127.0.0.1:6381" )
                .setConnectTimeout(5000)
                .setDatabase(0);
        config.setCodec(new StringCodec());
        return Redisson.create(config);
    }


    /**
     * 锁使用示例--联锁
     *
     * 将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。
     * 当全部锁加锁成功,才会成功。用于多台redis
     *
     */
    @PostMapping("/multiLock")
    public BaseResponse<String> multiLockExample(@RequestBody String reqBody){
        //当前的线程排序值
        String threadIndex="0";
        threadIndex= JSONUtil.parseObj(reqBody).getStr("threadIndex");

        RedissonClient redissonClient2=this.getRedisson2();
        RedissonClient redissonClient1=this.getRedisson1();
        RedissonClient redissonClient3=this.getRedisson3();

        //设置锁
        RLock lock1 = redissonClient1.getLock("multiLock1");
        RLock lock2 = redissonClient2.getLock("multiLock1");
        RLock lock3 = redissonClient3.getLock("multiLock1");

        //联锁。全部上锁成功,联锁上锁成功
        RedissonMultiLock mutilock = new RedissonMultiLock(lock1, lock2, lock3);
        boolean result=false;
        try {
            //尝试获取锁,如果获取成功返回true。第一个参数为尝试获取锁的时间(在该时间内获取成功则返回true),第二个参数为锁的有效时间,第三个参数为时间的单位,当前使用的是s(秒)
            result=mutilock.tryLock(160, 80, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        if(result){
            System.out.println("线程"+threadIndex+"获取锁成功");
            try{
                Thread.sleep(5000);
            }catch (Exception e){

            }
//            mutilock.unlock();
            System.out.println("线程"+threadIndex+"锁解锁");
        }else{
            System.out.println("线程"+threadIndex+"锁获取失败!!!");
        }
        return BaseResponse.success("LockTest"+threadIndex);
    }

执行结果:

img.png

img_1.png

img_2.png

三个Redis中均加锁成功

# 红锁

/**
     * 锁使用示例--红锁
     *
     * 一半以上的锁加锁成功,变判定成功。用于多台redis,解决主从场景下,主服务器宕机数据未及时同步引发的问题
     * 若需同时使用多个Redisson实例,需要开发者手动创建
     */
    @PostMapping("/redLock")
    public BaseResponse<String> redLockExample(@RequestBody String reqBody){
        //当前的线程排序值
        String threadIndex="0";
        threadIndex= JSONUtil.parseObj(reqBody).getStr("threadIndex");


        RedissonClient redissonClient1=this.getRedisson1();
        RedissonClient redissonClient2=this.getRedisson2();
        RedissonClient redissonClient3=this.getRedisson3();

        //设置锁
        RLock lock1 = redissonClient1.getLock("redLock1");
        RLock lock2 = redissonClient2.getLock("redLock2");
        RLock lock3 = redissonClient3.getLock("redLock3");

        //设置红锁。大部分锁上锁成功,红锁上锁成功
        RedissonRedLock lockRed = new RedissonRedLock(lock1, lock2, lock3);

        boolean result=false;
        try {
            //尝试获取锁,如果获取成功返回true。第一个参数为尝试获取锁的时间(在该时间内获取成功则返回true),第二个参数为锁的有效时间,第三个参数为时间的单位,当前使用的是s(秒)
            result=lockRed.tryLock(60, 30, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }

        if(result){
            System.out.println("线程"+threadIndex+"获取锁成功");
            try{
                Thread.sleep(5000);
            }catch (Exception e){

            }
            //lockRed.unlock();
            System.out.println("线程"+threadIndex+"锁解锁");
        }else{
            System.out.println("线程"+threadIndex+"锁获取失败!!!");
        }
        return BaseResponse.success("LockTest"+threadIndex);
    }

运行结果:

img.png

img_1.png

img_2.png

三个Redis中均加锁成功

# 读写锁

/**
*
* 读写锁
*
* 读写锁把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作
* 适合于对数据结构的读次数比写次数多得多的情况. 因为, 读模式锁定时可以共享, 以写模式锁住时意味着独占, 所以读写锁又叫共享-独占锁.
*
*/

    /**
     * 锁使用示例--读写锁-读锁
     */
    @PostMapping("/readLock")
    public BaseResponse<String> readLockExample(@RequestBody String reqBody){
        //当前的线程排序值
        String threadIndex="0";
        threadIndex= JSONUtil.parseObj(reqBody).getStr("threadIndex");

        //设置锁
        RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");

        try {
            //读写锁-读锁上锁,读锁为共享锁。第一个参数为锁的有效时间,第二个参数为时间的单位,当前使用的是s(秒)
            rwlock.readLock().lock(10, TimeUnit.SECONDS);
            //尝试获取锁,如果获取成功返回true。第一个参数为尝试获取锁的时间(在该时间内获取成功则返回true),第二个参数为锁的有效时间,第三个参数为时间的单位,当前使用的是s(秒)
            //rwlock.readLock().tryLock(60, 30, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //是否被锁
        boolean result= rwlock.readLock().isLocked();

        if(result){
            System.out.println("线程"+threadIndex+"获取锁成功");
            try{
                Thread.sleep(5000);
            }catch (Exception e){

            }
            rwlock.readLock().unlock();
            System.out.println("线程"+threadIndex+"锁解锁");
        }else{
            System.out.println("线程"+threadIndex+"锁获取失败!!!");
        }
        return BaseResponse.success("LockTest"+threadIndex);
    }

    /**
     * 锁使用示例--读写锁-写锁
     */
    @PostMapping("/writeLock")
    public BaseResponse<String> writeLockExample(@RequestBody String reqBody){
        //当前的线程排序值
        String threadIndex="0";
        threadIndex= JSONUtil.parseObj(reqBody).getStr("threadIndex");

        //设置锁
        RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");

        try {
            //读写锁-写锁上锁,读锁为排它锁。第一个参数为锁的有效时间,第二个参数为时间的单位,当前使用的是s(秒)
            rwlock.writeLock().lock(10, TimeUnit.SECONDS);
            //尝试获取锁,如果获取成功返回true。第一个参数为尝试获取锁的时间(在该时间内获取成功则返回true),第二个参数为锁的有效时间,第三个参数为时间的单位,当前使用的是s(秒)
            //rwlock.writeLock().tryLock(60, 30, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //是否被锁
        boolean result= rwlock.writeLock().isLocked();

        if(result){
            System.out.println("线程"+threadIndex+"获取锁成功");
            try{
                Thread.sleep(5000);
            }catch (Exception e){

            }
            rwlock.writeLock().unlock();
            System.out.println("线程"+threadIndex+"锁解锁");
        }else{
            System.out.println("线程"+threadIndex+"锁获取失败!!!");
        }
        return BaseResponse.success("LockTest"+threadIndex);
    }

同时执行10个请求,运行结果:

读锁:

img.png

可见10个线程均获取到了读锁

写锁:

img.png

只有上一个写锁被释放之后,其他的线程才可以获取到写锁

# 闭锁

/**
     * 锁使用示例--闭锁
     *
     * 场景:分布式环境下,在要完成某些运算时,只有其它线程的运算全部运行完毕,当前运算才继续下去。
     */

    /**
     * 主进程
     * 设置闭锁总量并进入阻塞状态,等待副进程处理完成,即闭锁中的总量归0
     *
     * @param reqBody
     * @return
     */
    @PostMapping("/countDownLatchM")
    public BaseResponse<String> countDownLatchMExample(@RequestBody String reqBody){
        //当前的线程排序值
        String threadIndex="0";
        threadIndex= JSONUtil.parseObj(reqBody).getStr("threadIndex");

        //设置锁
        RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
        //设置总量
        latch.trySetCount(10);

        try {
            //进入阻塞,等总量清0之后才会继续执行
            latch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return BaseResponse.success("LockTest"+threadIndex);
    }

    /**
     * 其他处理调用(副进程)
     * 主要处理主进程需要继续进行之前,需要完成的计算或者处理逻辑。在处理完成之后,将闭锁的总量减1
     *
     * @param reqBody
     * @return
     */
    @PostMapping("/countDownLatchF")
    public BaseResponse<String> countDownLatchFExample(@RequestBody String reqBody){
        //当前的线程排序值
        String threadIndex="0";
        threadIndex= JSONUtil.parseObj(reqBody).getStr("threadIndex");

        //设置锁
        RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
        try {
            //闭锁,减少计数
            System.out.println("线程"+threadIndex+"闭锁:减少计数");
            latch.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
        //获取剩余的量
        long result= latch.getCount();
        System.out.println("线程"+threadIndex+"闭锁;剩余为:"+result);

        return BaseResponse.success("LockTest"+threadIndex);
    }




先运行主进程,再运行10次副进程:

img.png

主进程运行之后进入阻塞,等副进程全部运行完成之后,此时闭锁总量归0,主进程继续进行

# 信号量

/**
     * 锁使用示例--信号量,信号量归0,进入阻塞状态
     *
     * 场景:分布式限流,车位等
     *
     */
    @PostMapping("/Semaphore")
    public BaseResponse<String> SemaphoreExample(@RequestBody String reqBody){
        //当前的线程排序值
        String threadIndex="0";
        threadIndex= JSONUtil.parseObj(reqBody).getStr("threadIndex");

        //设置锁
        RSemaphore semaphore = redisson.getSemaphore("semaphore");
        //设置总信号
        //semaphore.trySetPermits(5);

        try {
            //获取一个信号,即总量减一。如果总信号不足,线程会等待
            semaphore.acquire();

            //等待1s,模拟代码执行时间
            try{
                Thread.sleep(1000);
            }catch (Exception e){

            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        //获取剩余信号
        long result= semaphore.availablePermits();
        System.out.println("线程"+threadIndex+";剩余信号为:"+result);
        //可获取并返回立即可用的所有信号个数,并且将可用信号置0
        //semaphore.drainPermits();

        //释放一个信号,即总量加一
        semaphore.release();

        return BaseResponse.success("LockTest"+threadIndex);
    }

运行结果:

img.png