SpringBoot支持Redis的分布式锁

阅读数:155 评论数:0

跳转到新版页面

分类

python/Java

正文

一、简介

分布式锁,其原理就是多台机器去争抢一个资源,谁争抢成功,那么就持有这把锁。

可以通过多种途径实现分布式锁,例如数据库,插入一条记录(唯一索引),谁插入成功,谁就持有;还可以通过zookeeper,谁创建节点成功,谁就持有锁。

二、使用RedisTemplate

@Component
public class DistributedLock {
​
 @Autowired
 private StringRedisTemplate redisTemplate;
/**
 * 获得锁
 */
 public boolean getLock(String lockId, long millisecond) {
 Boolean success = redisTemplate.opsForValue().setIfAbsent(lockId, "lock",
 millisecond, TimeUnit.MILLISECONDS);
 return success != null && success;
 }
​
public void releaseLock(String lockId) {
 redisTemplate.delete(lockId);
 }
}

三、进一步完善(看门狗、守护进程)

对于超时操作,有一个专门的线程来监视,watchDog进行锁续命。

四、使用redission(与redisTemplate共用)

如果要想同是地使用redisTemplate和redission,不要使用redission-spring-boot-starter自动装配,而是自己来注册RedissinClient,这们RedisTemplate底层使用Connection实现就是LettuceConnection了。

1、pom引入

https://mvnrepository.com/artifact/org.redisson/redisson
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.13.4</version>
        </dependency>

另外记得引入

<!-- springboot提供的自动配置和依赖注入 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

2、配置注入

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

    @Value("${redis.master.name}")
    private String masterName;

    @Value("${redis.sentinel.ip1}")
    private String sentinelIp1;
    @Value("${redis.sentinel.ip2}")
    private String sentinelIp2;
    @Value("${redis.sentinel.ip3}")
    private String sentinelIp3;
    @Value("${redis.sentinel.port1}")
    private String port1;
    @Value("${redis.sentinel.port2}")
    private String port2;
    @Value("${redis.sentinel.port3}")
    private String port3;
    @Value("${redis.database}")
    private int dataBase;
    @Value("${redis.password}")
    private String password;

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.useSentinelServers().setMasterName(masterName)
                .addSentinelAddress("redis://"+sentinelIp1+":"+port1)
                .addSentinelAddress("redis://"+sentinelIp2+":"+port2)
               .addSentinelAddress("redis://"+sentinelIp3+":"+port3).setPassword(password)
                .setDatabase(dataBase).setTimeout(3600);
        System.out.println("sentinelIp1 = " + sentinelIp1);
        return Redisson.create(config);
    }
}

集群方式

Config config = new Config();
config.useClusterServers().addNodeAddress("127.0.0.1:7000")
        .addNodeAddress("127.0.0.1:7001")
        .setPassword("password").setScanInterval(2000);

单实例方式

    // redission通过redissonClient对象使用 // 如果是多个redis集群,可以配置
    @Bean(destroyMethod = "shutdown")
    public RedissonClient redisson(){
        Config config = new Config();
        // 创建单例模式的配置
        config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("xudaze200129");
        return Redisson.create(config);
    }

3、加锁逻辑

    @Autowired
    private RedissonClient redissonClient;
    
    public Map<String, List<IndexCategoryVo>> getIndexCategoryMapRedissionLock() {
        // 占锁 没有拿到锁的会自动阻塞
        // watchDog 机制 : 锁自动加了默认30秒过期
        // 如果业务代码耗时长,锁也会自动续期
        RLock lock = redissonClient.getLock(CategoryServiceImpl.LOCK);
        lock.lock();
        try {
        	//业务逻辑
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return null;
    }

五、只使用redission

1、pom依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.16.6</version>
</dependency>

引入次依赖后,无需再引入spring-boot-starter-data-redis,其redisson-spring-boot-starter内部已经进行了引入,且排除了Redis的Luttuce 以及 Jedis客户端

2、配置

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    # password:
    # database: 0     #指定数据库,默认为0
    # timeout: 3000   #连接超时时间,单位毫秒,默认为0。也可以这么写:3s
    # ssl: false      # 是否启用SSL连接,默认false
    # pool: #连接池配置
    #   max-active: 8 #最大活跃连接数,默认8个。
    #   max-idle: 8   #最大空闲连接数,默认8个。
    #   max-wait: -1  #获取连接的最大等待时间,默认-1,表示无限制,单位毫秒。
    #                 #默认值可能会因为获取不到连接,导致事务无法提交,数据库被锁,大量线程处于等待状态的情况。
    #   min-idle: 0   #最小空闲连接数,默认0。
    # sentinel:
    #   master: myMaster             #哨兵master
    #   nodes: host1:port,host2:port #哨兵节点
    # cluster:
    #   max-redirects:               # 集群模式下,集群最大转发的数量
    #   nodes: host1:port,host2:port # 集群节点

3、RedissionConfig

// 
@ConfigurationProperties(prefix = "spring.redis")
@Configuration
@Data
public class RedissonConfig {
 
    // 读取配置文件里面的Redis信息
    private String host;
    private String port;
    private String password;
    private Cluster cluster;
    
    public static class Cluster {
        private List<String> nodes;
        public List<String> getNodes() {
            return nodes;
        }
        public void setNodes(List<String> nodes) {
            this.nodes = nodes;
        }
    }
 
    /**
     * 配置redisson --集群方式
     * Redisson是RedissonClient的实现类 
     * @return
     */
    @Bean(destroyMethod = "shutdown")
    public Redisson redisson() {
        List<String> clusterNodes = new ArrayList<>();
        for (int i = 0; i < this.getCluster().getNodes().size(); i++) {
            clusterNodes.add("redis://" + this.getCluster().getNodes().get(i));
        }
        Config config = new Config();
        ClusterServersConfig clusterServersConfig = config.useClusterServers()
                .addNodeAddress(clusterNodes.toArray(new String[clusterNodes.size()]));
        clusterServersConfig.setPassword(getPassword());
        return (Redisson) Redisson.create(config);
    }
    /**
     * 配置redisson --单节点
     * @return
     */
    @Bean(destroyMethod = "shutdown")
	public RedissonClient redissonClient() {
        Config config = new Config();
            String address = "redis://" + host + ":" + port;
            //使用json序列化方式
            config.setCodec(new JsonJacksonCodec());
            config.useSingleServer().setAddress(address).setPassword(password);
        
        return Redisson.create(config);
    }
 
}

六、Redission使用方法

https://github.com/redisson/redisson

1、RKeys

RKeys keys = client.getKeys();
//获取所有key值
Iterable<String> allKeys = keys.getKeys();
//获取所有模糊key值
Iterable<String> foundedKeys = keys.getKeysByPattern("key");
//删除多个key值
long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");
//删除模糊key值
long deletedKeysAmount = keys.deleteByPattern("test?");
//随机获取key
String randomKey = keys.randomKey();
//当前多少key数
long keysAmount = keys.count();

2、RBucket

Redisson的分布式RBucket Java对象是一种通用对象桶可以用来存放任类型的对象。 除了同步接口外,还提供了异步、响应式和RxJava2标准的接口。

(1)基本操作

//对象桶 单个元素的存储
RBucket<String> bucket = client.getBucket("REDIS_BUCKET_TEST");
//同步 存储元素进对象桶,无返回值
bucket.set("1");
//异步
bucket.setAsync("1");
//返回bucket中key的值,返回1
bucket.get();
//尝试对REDIS_BUCKET_TEST存值,如果已存在值就不存进去,存在就存进去
// (先查询是否存在,true则存入,false则返回)
boolean bl = bucket.trySet("2");
//第一个参数与存的值比较,相等就存入第二个参数
boolean bl = bucket.compareAndSet("2", "3");
//get返回并且存入新的值
String str =  bucket.getAndSet("4");

(2)批量操作

RBuckets buckets = client.getBuckets();
//获取多个buckets
Map<String, String> loadedBuckets = buckets.get("REDIS_BUCKET_TEST", "REDIS_BUCKET_TEST1", "REDIS_BUCKET_TEST2");
Map<String, Object> map = new HashMap<>();
map.put("REDIS_BUCKET_TEST", "0");
map.put("REDIS_BUCKET_TEST1","1");
// 利用Redis的事务特性,同时保存所有的通用对象桶,如果任意一个通用对象桶已经存在则放弃保存其他所有数据。
buckets.trySet(map);
// 同时保存全部通用对象桶。
buckets.set(map);

3、二进制流Binary Stream

流的最大容量受Redis主节点的内存大小限制。

RBinaryStream stream = client.getBinaryStream("anyStream");
byte[] content = "ceshi".getBytes();
//设置流内容ceshi
stream.set(content);
InputStream is = stream.getInputStream();
byte[] readBuffer = new byte[512];
//读取流ceshi
is.read(readBuffer);
OutputStream os = stream.getOutputStream();
byte[] contentToWrite = "ceshi1".getBytes();
//写入流ceshi1,注意:不是覆盖是写入,最终流为ceshiceshi1
os.write(contentToWrite);

4、地理空间对象桶 Geospatial Bucket

RGeo<String> geo = client.getGeo("test");
//同步存储可存储多个
geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"),
        new GeoEntry(15.087269, 37.502669, "Catania"));
//异步存储只能一个
geo.addAsync(37.618423, 55.751244, "Moscow");
//返回两个坐标的距离
Double distance = geo.dist("Palermo", "Catania", GeoUnit.METERS);
//返回成员的地理位置
Map<String, GeoPosition> positions = geo.pos("test2", "Palermo", "test3", "Catania", "test1");
//暂时没研究
RFuture<Map<String, String>> future= geo.hashAsync("Palermo", "Catania");
List<String> cities = geo.radius(15, 37, 200, GeoUnit.KILOMETERS);
Map<String, GeoPosition> citiesWithPositions = geo.radiusWithPosition(15, 37, 200, GeoUnit.KILOMETERS);

5、原子型

(1)AtomicLong

RAtomicLong atomicLong = client.getAtomicLong("myAtomicLong");
//设置成3
atomicLong.set(3);
//原子地将当前值增加1。
atomicLong.incrementAndGet();
//获取值
atomicLong.get();

(2)AtomicDouble

RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble");
//设置值
atomicDouble.set(2.81);
//加法并且返回加好后的值
atomicDouble.addAndGet(4.11);
//获取值
atomicDouble.get();

6、订阅发布

RTopic topic = redisson.getTopic("anyTopic");
topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(String channel, SomeObject message) {
        //...
    }
});

// 在其他线程或JVM节点
RTopic topic = redisson.getTopic("anyTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());
// 订阅所有满足`topic1.*`表达式的话题
RPatternTopic topic1 = redisson.getPatternTopic("topic1.*");
int listenerId = topic1.addListener(Message.class, new PatternMessageListener<Message>() {
    @Override
    public void onMessage(String pattern, String channel, Message msg) {
         Assert.fail();
    }
});

在Redis节点故障转移(主从切换)或断线重连以后,所有的话题监听器将自动完成话题的重新订阅。

7、布隆过滤器 Bloom Filter

BloomFilter主要用于检索一个元素是否在集合中,优点是空间效率和查询效率比较高。缺点是存在误差率。

RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
// 初始化布隆过滤器,预计统计元素数量为55000000,期望误差率为0.03
bloomFilter.tryInit(55000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));

集群模式

RClusteredBloomFilter<SomeObject> bloomFilter = redisson.getClusteredBloomFilter("sample");
// 采用以下参数创建布隆过滤器
// expectedInsertions = 255000000
// falseProbability = 0.03
bloomFilter.tryInit(255000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));

8、基数估计HyperLogLog

该对象可以在有限的空间内通过概率算法统计大量的数据。

RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");
log.add(1);
log.add(2);
log.add(3);

log.count();

9、累加器

(1)LongAdder

通过利用客户端内置的LongAdder对象,为分布式环境下递增和递减操作提供了很高得性能。

RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();

当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。

RLongAdder atomicLong = ...
atomicLong.destroy();

(2)DoubleAdder

RLongDouble atomicDouble = redisson.getLongDouble("myLongDouble");
atomicDouble.add(12);
atomicDouble.increment();
atomicDouble.decrement();
atomicDouble.sum();

当不再使用双精度浮点累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。

10、限流器 RateLimiter

既适用于不同Redisson实例下的多线程限流,也适用于相同Redisson实例下的多线程限流。该算法不保证公平性。

RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter");
// 初始化
// 最大流速 = 每1秒钟产生10个令牌
rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);

CountDownLatch latch = new CountDownLatch(2);
limiter.acquire(3);
// ...

Thread t = new Thread(() -> {
    limiter.acquire(2);
    // ...        
});

11、映射RMap

与HashMap不同的是,RMap保持了元素的插入顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

在特定的场景下,映射缓存(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,可以使用Redisson提供的带有的本地缓存功能。

RMap<String, String> map = client.getMap("anyMap");
//设置值,并返回先前的关联值,新key就返回null
String prevObject = map.put("123","1");
//判断hash是否存在,存在就不插入返回先前value.不存在返回null
String currentObject = map.putIfAbsent("123", "3");
//移除key。返回先前值
String obj = map.remove("123");
//比put快 true如果key是哈希中的新密钥,并且已设置值。 false如果密钥已经存在于哈希中并且值已更新
boolean bl = map.fastPut("1234", "1");
//批量移除key,返回移除的数量
long count = map.fastRemove("321");
//异步 设置值,并返回先前的关联值,新key就返回null
RFuture<String> putAsyncFuture = map.putAsync("321","1");
//返回结果而不会阻塞。
String putAsyncFutureStr =  putAsyncFuture.getNow();
//异步 true如果key是哈希中的新密钥,并且已设置值。 false如果密钥已经存在于哈希中并且值已更新
RFuture<Boolean> fastPutAsyncFuture = map.fastPutAsync("321","1");
//异步移除key
RFuture<Long> count =  map.fastRemoveAsync("321");

(1)元素淘汰 Eviction

RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// 有效时间 ttl = 10分钟
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
// 有效时间 ttl = 10分钟, 最长闲置时间 maxIdleTime = 10秒钟
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);

// 有效时间 = 3 秒钟
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
// 有效时间 ttl = 40秒钟, 最长闲置时间 maxIdleTime = 10秒钟
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);

(2)本地缓存

LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
      // 用于淘汰清除本地缓存内的元素
      // 共有以下几种选择:
      // LFU - 统计元素的使用频率,淘汰用得最少(最不常用)的。
      // LRU - 按元素使用时间排序比较,淘汰最早(最久远)的。
      // SOFT - 元素用Java的WeakReference来保存,缓存元素通过GC过程清除。
      // WEAK - 元素用Java的SoftReference来保存, 缓存元素通过GC过程清除。
      // NONE - 永不淘汰清除缓存元素。
     .evictionPolicy(EvictionPolicy.NONE)
     // 如果缓存容量值为0表示不限制本地缓存容量大小
     .cacheSize(1000)
      // 以下选项适用于断线原因造成了未收到本地缓存更新消息的情况。
      // 断线重连的策略有以下几种:
      // CLEAR - 如果断线一段时间以后则在重新建立连接以后清空本地缓存
      // LOAD - 在服务端保存一份10分钟的作废日志
      //        如果10分钟内重新建立连接,则按照作废日志内的记录清空本地缓存的元素
      //        如果断线时间超过了这个时间,则将清空本地缓存中所有的内容
      // NONE - 默认值。断线重连时不做处理。
     .reconnectionStrategy(ReconnectionStrategy.NONE)
      // 以下选项适用于不同本地缓存之间相互保持同步的情况
      // 缓存同步策略有以下几种:
      // INVALIDATE - 默认值。当本地缓存映射的某条元素发生变动时,同时驱逐所有相同本地缓存映射内的该元素
      // UPDATE - 当本地缓存映射的某条元素发生变动时,同时更新所有相同本地缓存映射内的该元素
      // NONE - 不做任何同步处理
     .syncStrategy(SyncStrategy.INVALIDATE)
      // 每个Map本地缓存里元素的有效时间,默认毫秒为单位
     .timeToLive(10000)
      // 或者
     .timeToLive(10, TimeUnit.SECONDS)
      // 每个Map本地缓存里元素的最长闲置时间,默认毫秒为单位
     .maxIdle(10000)
      // 或者
     .maxIdle(10, TimeUnit.SECONDS);
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", options);

String prevObject = map.put("123", 1);
String currentObject = map.putIfAbsent("323", 2);
String obj = map.remove("123");

// 在不需要旧值的情况下可以使用fast为前缀的类似方法
map.fastPut("a", 1);
map.fastPutIfAbsent("d", 32);
map.fastRemove("b");

RFuture<String> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");

map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");

当不再使用Map本地缓存对象的时候应该手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。

RLocalCachedMap<String, Integer> map = ...
map.destroy();

(3)集群下使用

RClusteredMap<String, SomeObject> map = redisson.getClusteredMap("anyMap");

SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");

map.fastPut("321", new SomeObject());
map.fastRemove("321");

(4)映射监听器

RMapCache<String, Integer> map = redisson.getMapCache("myMap");
// 或
RLocalCachedMapCache<String, Integer> map = redisson.getLocalCachedMapCache("myMap", options);

int updateListener = map.addListener(new EntryUpdatedListener<Integer, Integer>() {
     @Override
     public void onUpdated(EntryEvent<Integer, Integer> event) {
          event.getKey(); // 字段名
          event.getValue() // 新值
          event.getOldValue() // 旧值
          // ...
     }
});

int createListener = map.addListener(new EntryCreatedListener<Integer, Integer>() {
     @Override
     public void onCreated(EntryEvent<Integer, Integer> event) {
          event.getKey(); // 字段名
          event.getValue() // 值
          // ...
     }
});

int expireListener = map.addListener(new EntryExpiredListener<Integer, Integer>() {
     @Override
     public void onExpired(EntryEvent<Integer, Integer> event) {
          event.getKey(); // 字段名
          event.getValue() // 值
          // ...
     }
});

int removeListener = map.addListener(new EntryRemovedListener<Integer, Integer>() {
     @Override
     public void onRemoved(EntryEvent<Integer, Integer> event) {
          event.getKey(); // 字段名
          event.getValue() // 值
          // ...
     }
});

map.removeListener(updateListener);
map.removeListener(createListener);
map.removeListener(expireListener);
map.removeListener(removeListener);

(5)LRU有界设定

分布式LRU有界映射允许通过对其中元素按使用时间排序处理的方式,主动移除超过规定容量限制的元素。

RMapCache<String, String> map = redisson.getMapCache("map");
// 尝试将该映射的最大容量限制设定为10
map.trySetMaxSize(10);

// 将该映射的最大容量限制设定或更改为10
map.setMaxSize(10);

map.put("1", "2");
map.put("3", "3", 1, TimeUnit.SECONDS);

12、多值映射Multipmap

Map中的一个字段值包含多个元素

(1)基于Set的Multimap

基于Set的Multimap不允许一个字段值包含有重复的元素。

RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("3"), new SimpleValue("4"));

Set<SimpleValue> allValues = map.get(new SimpleKey("0"));

List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
//替换值的集合,如果先前没有与该键关联的值,则为空集合
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);

Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));

(2)基于List的MultipMap

RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));

List<SimpleValue> allValues = map.get(new SimpleKey("0"));

Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);

List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));

13、RSet

保证了每个元素的唯一性

RSet<SomeObject> set = redisson.getSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());

集群模式

RClusteredSet<SomeObject> set = redisson.getClusteredSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());

14、锁RLock

(1)可重入锁 Reentrant Lock (RLock)

所谓重入锁,指的是以线程为单位,当一个线程获取对象锁之后,这个线程可以再次获取本对象上的锁,而其他的线程是不可以的

只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误

RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();

(2)指定加锁时间

// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

(3)异步执行锁

RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

(4)公平锁

每个线程抢占锁的顺序为先后调用lock方法的顺序依次获取锁

RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();

(5)联锁 MultiLock

可以将多个RLock对象关联为一个联锁

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();

(6)红锁RedissionRedLock

该对象也可以用来将多个RLock对象关联为一个红锁

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();

(7)读写锁RReadWriteLock

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

15、信号量 RSemaphore

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();

过期性信号量,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);

16、闭锁 RCountDownLatch

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();



相关推荐

当然可以自己写redis的工具类,或者使用第三方开源jar包或代码,这里使用spring boot的集成类。 一、pom依赖 <dependency> <gro

一、概述 springboot中有两种方式使用kafka时,直接使用kafka-client连接kafka服务;另一种是使用spring-kafka框架来连接kafka。 1、版本兼容 使用时要注意版

websocket协议基于tcp的网络协议,它实现浏览器与器全双工通信。 spring boot2 +websocket 1、添加依赖 <pre clas

背景: 之前用spring boot+mybatis+oracle,现在要改成spring boot_mybatis+postgresql。 为了想让一套代码即可

共同点 都是用来表示Spring某个类是否可以接收HTTP请求。 不同点 @Controller标识一个spring类是Spring MVC c

系统异常捕获 参见:spring boot 2 全局异常处理 @ControllerAdvice(annotations = {RestController.class}) public class

mvn依赖 &lt;dependency&gt; &lt;groupId

从SSH(Structs/Spring/Hibernate)到SSM(SpringMVC/Spring/MyBatis),到现在一个S(Spring)就够的年代,可见Spring越来越重要了。<

@ConditionalOnMissingBean只能在@Bean注解的方法上使用。 可以给该注解传入参数例如@ConditionOnMissingBean(name = "exa

不同版本和回收资源方式 1、jedis版本&lt;3.0.0 JedisPool jedisPool