SpringBoot支持Redis的分布式锁
阅读数:155 评论数:0
跳转到新版页面分类
python/Java
正文
一、简介
分布式锁,其原理就是多台机器去争抢一个资源,谁争抢成功,那么就持有这把锁。
可以通过多种途径实现分布式锁,例如数据库,插入一条记录(唯一索引),谁插入成功,谁就持有;还可以通过zookeeper,谁创建节点成功,谁就持有锁。
二、使用RedisTemplate
@Component
public class DistributedLock {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 获得锁
*/
public boolean getLock(String lockId, long millisecond) {
Boolean success = redisTemplate.opsForValue().setIfAbsent(lockId, "lock",
millisecond, TimeUnit.MILLISECONDS);
return success != null && success;
}
public void releaseLock(String lockId) {
redisTemplate.delete(lockId);
}
}
三、进一步完善(看门狗、守护进程)
对于超时操作,有一个专门的线程来监视,watchDog进行锁续命。
四、使用redission(与redisTemplate共用)
如果要想同是地使用redisTemplate和redission,不要使用redission-spring-boot-starter自动装配,而是自己来注册RedissinClient,这们RedisTemplate底层使用Connection实现就是LettuceConnection了。
1、pom引入
https://mvnrepository.com/artifact/org.redisson/redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.4</version>
</dependency>
另外记得引入
<!-- springboot提供的自动配置和依赖注入 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
2、配置注入
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Value("${redis.master.name}")
private String masterName;
@Value("${redis.sentinel.ip1}")
private String sentinelIp1;
@Value("${redis.sentinel.ip2}")
private String sentinelIp2;
@Value("${redis.sentinel.ip3}")
private String sentinelIp3;
@Value("${redis.sentinel.port1}")
private String port1;
@Value("${redis.sentinel.port2}")
private String port2;
@Value("${redis.sentinel.port3}")
private String port3;
@Value("${redis.database}")
private int dataBase;
@Value("${redis.password}")
private String password;
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSentinelServers().setMasterName(masterName)
.addSentinelAddress("redis://"+sentinelIp1+":"+port1)
.addSentinelAddress("redis://"+sentinelIp2+":"+port2)
.addSentinelAddress("redis://"+sentinelIp3+":"+port3).setPassword(password)
.setDatabase(dataBase).setTimeout(3600);
System.out.println("sentinelIp1 = " + sentinelIp1);
return Redisson.create(config);
}
}
集群方式
Config config = new Config();
config.useClusterServers().addNodeAddress("127.0.0.1:7000")
.addNodeAddress("127.0.0.1:7001")
.setPassword("password").setScanInterval(2000);
单实例方式
// redission通过redissonClient对象使用 // 如果是多个redis集群,可以配置
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson(){
Config config = new Config();
// 创建单例模式的配置
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("xudaze200129");
return Redisson.create(config);
}
3、加锁逻辑
@Autowired
private RedissonClient redissonClient;
public Map<String, List<IndexCategoryVo>> getIndexCategoryMapRedissionLock() {
// 占锁 没有拿到锁的会自动阻塞
// watchDog 机制 : 锁自动加了默认30秒过期
// 如果业务代码耗时长,锁也会自动续期
RLock lock = redissonClient.getLock(CategoryServiceImpl.LOCK);
lock.lock();
try {
//业务逻辑
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return null;
}
五、只使用redission
1、pom依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.6</version>
</dependency>
引入次依赖后,无需再引入spring-boot-starter-data-redis
,其redisson-spring-boot-starter
内部已经进行了引入,且排除了Redis的Luttuce 以及 Jedis客户端
2、配置
spring:
redis:
host: 127.0.0.1
port: 6379
# password:
# database: 0 #指定数据库,默认为0
# timeout: 3000 #连接超时时间,单位毫秒,默认为0。也可以这么写:3s
# ssl: false # 是否启用SSL连接,默认false
# pool: #连接池配置
# max-active: 8 #最大活跃连接数,默认8个。
# max-idle: 8 #最大空闲连接数,默认8个。
# max-wait: -1 #获取连接的最大等待时间,默认-1,表示无限制,单位毫秒。
# #默认值可能会因为获取不到连接,导致事务无法提交,数据库被锁,大量线程处于等待状态的情况。
# min-idle: 0 #最小空闲连接数,默认0。
# sentinel:
# master: myMaster #哨兵master
# nodes: host1:port,host2:port #哨兵节点
# cluster:
# max-redirects: # 集群模式下,集群最大转发的数量
# nodes: host1:port,host2:port # 集群节点
3、RedissionConfig
//
@ConfigurationProperties(prefix = "spring.redis")
@Configuration
@Data
public class RedissonConfig {
// 读取配置文件里面的Redis信息
private String host;
private String port;
private String password;
private Cluster cluster;
public static class Cluster {
private List<String> nodes;
public List<String> getNodes() {
return nodes;
}
public void setNodes(List<String> nodes) {
this.nodes = nodes;
}
}
/**
* 配置redisson --集群方式
* Redisson是RedissonClient的实现类
* @return
*/
@Bean(destroyMethod = "shutdown")
public Redisson redisson() {
List<String> clusterNodes = new ArrayList<>();
for (int i = 0; i < this.getCluster().getNodes().size(); i++) {
clusterNodes.add("redis://" + this.getCluster().getNodes().get(i));
}
Config config = new Config();
ClusterServersConfig clusterServersConfig = config.useClusterServers()
.addNodeAddress(clusterNodes.toArray(new String[clusterNodes.size()]));
clusterServersConfig.setPassword(getPassword());
return (Redisson) Redisson.create(config);
}
/**
* 配置redisson --单节点
* @return
*/
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
Config config = new Config();
String address = "redis://" + host + ":" + port;
//使用json序列化方式
config.setCodec(new JsonJacksonCodec());
config.useSingleServer().setAddress(address).setPassword(password);
return Redisson.create(config);
}
}
六、Redission使用方法
https://github.com/redisson/redisson
RKeys keys = client.getKeys();
//获取所有key值
Iterable<String> allKeys = keys.getKeys();
//获取所有模糊key值
Iterable<String> foundedKeys = keys.getKeysByPattern("key");
//删除多个key值
long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");
//删除模糊key值
long deletedKeysAmount = keys.deleteByPattern("test?");
//随机获取key
String randomKey = keys.randomKey();
//当前多少key数
long keysAmount = keys.count();
Redisson的分布式RBucket Java对象是一种通用对象桶可以用来存放任类型的对象。 除了同步接口外,还提供了异步、响应式和RxJava2标准的接口。
(1)基本操作
//对象桶 单个元素的存储
RBucket<String> bucket = client.getBucket("REDIS_BUCKET_TEST");
//同步 存储元素进对象桶,无返回值
bucket.set("1");
//异步
bucket.setAsync("1");
//返回bucket中key的值,返回1
bucket.get();
//尝试对REDIS_BUCKET_TEST存值,如果已存在值就不存进去,存在就存进去
// (先查询是否存在,true则存入,false则返回)
boolean bl = bucket.trySet("2");
//第一个参数与存的值比较,相等就存入第二个参数
boolean bl = bucket.compareAndSet("2", "3");
//get返回并且存入新的值
String str = bucket.getAndSet("4");
(2)批量操作
RBuckets buckets = client.getBuckets();
//获取多个buckets
Map<String, String> loadedBuckets = buckets.get("REDIS_BUCKET_TEST", "REDIS_BUCKET_TEST1", "REDIS_BUCKET_TEST2");
Map<String, Object> map = new HashMap<>();
map.put("REDIS_BUCKET_TEST", "0");
map.put("REDIS_BUCKET_TEST1","1");
// 利用Redis的事务特性,同时保存所有的通用对象桶,如果任意一个通用对象桶已经存在则放弃保存其他所有数据。
buckets.trySet(map);
// 同时保存全部通用对象桶。
buckets.set(map);
流的最大容量受Redis主节点的内存大小限制。
RBinaryStream stream = client.getBinaryStream("anyStream");
byte[] content = "ceshi".getBytes();
//设置流内容ceshi
stream.set(content);
InputStream is = stream.getInputStream();
byte[] readBuffer = new byte[512];
//读取流ceshi
is.read(readBuffer);
OutputStream os = stream.getOutputStream();
byte[] contentToWrite = "ceshi1".getBytes();
//写入流ceshi1,注意:不是覆盖是写入,最终流为ceshiceshi1
os.write(contentToWrite);
RGeo<String> geo = client.getGeo("test");
//同步存储可存储多个
geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"),
new GeoEntry(15.087269, 37.502669, "Catania"));
//异步存储只能一个
geo.addAsync(37.618423, 55.751244, "Moscow");
//返回两个坐标的距离
Double distance = geo.dist("Palermo", "Catania", GeoUnit.METERS);
//返回成员的地理位置
Map<String, GeoPosition> positions = geo.pos("test2", "Palermo", "test3", "Catania", "test1");
//暂时没研究
RFuture<Map<String, String>> future= geo.hashAsync("Palermo", "Catania");
List<String> cities = geo.radius(15, 37, 200, GeoUnit.KILOMETERS);
Map<String, GeoPosition> citiesWithPositions = geo.radiusWithPosition(15, 37, 200, GeoUnit.KILOMETERS);
(1)AtomicLong
RAtomicLong atomicLong = client.getAtomicLong("myAtomicLong");
//设置成3
atomicLong.set(3);
//原子地将当前值增加1。
atomicLong.incrementAndGet();
//获取值
atomicLong.get();
(2)AtomicDouble
RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble");
//设置值
atomicDouble.set(2.81);
//加法并且返回加好后的值
atomicDouble.addAndGet(4.11);
//获取值
atomicDouble.get();
RTopic topic = redisson.getTopic("anyTopic");
topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(String channel, SomeObject message) {
//...
}
});
// 在其他线程或JVM节点
RTopic topic = redisson.getTopic("anyTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());
// 订阅所有满足`topic1.*`表达式的话题
RPatternTopic topic1 = redisson.getPatternTopic("topic1.*");
int listenerId = topic1.addListener(Message.class, new PatternMessageListener<Message>() {
@Override
public void onMessage(String pattern, String channel, Message msg) {
Assert.fail();
}
});
在Redis节点故障转移(主从切换)或断线重连以后,所有的话题监听器将自动完成话题的重新订阅。
BloomFilter主要用于检索一个元素是否在集合中,优点是空间效率和查询效率比较高。缺点是存在误差率。
RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
// 初始化布隆过滤器,预计统计元素数量为55000000,期望误差率为0.03
bloomFilter.tryInit(55000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));
集群模式
RClusteredBloomFilter<SomeObject> bloomFilter = redisson.getClusteredBloomFilter("sample");
// 采用以下参数创建布隆过滤器
// expectedInsertions = 255000000
// falseProbability = 0.03
bloomFilter.tryInit(255000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));
该对象可以在有限的空间内通过概率算法统计大量的数据。
RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");
log.add(1);
log.add(2);
log.add(3);
log.count();
(1)LongAdder
通过利用客户端内置的LongAdder对象,为分布式环境下递增和递减操作提供了很高得性能。
RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();
当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。
RLongAdder atomicLong = ...
atomicLong.destroy();
(2)DoubleAdder
RLongDouble atomicDouble = redisson.getLongDouble("myLongDouble");
atomicDouble.add(12);
atomicDouble.increment();
atomicDouble.decrement();
atomicDouble.sum();
当不再使用双精度浮点累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。
既适用于不同Redisson实例下的多线程限流,也适用于相同Redisson实例下的多线程限流。该算法不保证公平性。
RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter");
// 初始化
// 最大流速 = 每1秒钟产生10个令牌
rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);
CountDownLatch latch = new CountDownLatch(2);
limiter.acquire(3);
// ...
Thread t = new Thread(() -> {
limiter.acquire(2);
// ...
});
与HashMap不同的是,RMap保持了元素的插入顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295
个。
在特定的场景下,映射缓存(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,可以使用Redisson提供的带有的本地缓存功能。
RMap<String, String> map = client.getMap("anyMap");
//设置值,并返回先前的关联值,新key就返回null
String prevObject = map.put("123","1");
//判断hash是否存在,存在就不插入返回先前value.不存在返回null
String currentObject = map.putIfAbsent("123", "3");
//移除key。返回先前值
String obj = map.remove("123");
//比put快 true如果key是哈希中的新密钥,并且已设置值。 false如果密钥已经存在于哈希中并且值已更新
boolean bl = map.fastPut("1234", "1");
//批量移除key,返回移除的数量
long count = map.fastRemove("321");
//异步 设置值,并返回先前的关联值,新key就返回null
RFuture<String> putAsyncFuture = map.putAsync("321","1");
//返回结果而不会阻塞。
String putAsyncFutureStr = putAsyncFuture.getNow();
//异步 true如果key是哈希中的新密钥,并且已设置值。 false如果密钥已经存在于哈希中并且值已更新
RFuture<Boolean> fastPutAsyncFuture = map.fastPutAsync("321","1");
//异步移除key
RFuture<Long> count = map.fastRemoveAsync("321");
(1)元素淘汰 Eviction
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// 有效时间 ttl = 10分钟
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
// 有效时间 ttl = 10分钟, 最长闲置时间 maxIdleTime = 10秒钟
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);
// 有效时间 = 3 秒钟
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
// 有效时间 ttl = 40秒钟, 最长闲置时间 maxIdleTime = 10秒钟
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
(2)本地缓存
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
// 用于淘汰清除本地缓存内的元素
// 共有以下几种选择:
// LFU - 统计元素的使用频率,淘汰用得最少(最不常用)的。
// LRU - 按元素使用时间排序比较,淘汰最早(最久远)的。
// SOFT - 元素用Java的WeakReference来保存,缓存元素通过GC过程清除。
// WEAK - 元素用Java的SoftReference来保存, 缓存元素通过GC过程清除。
// NONE - 永不淘汰清除缓存元素。
.evictionPolicy(EvictionPolicy.NONE)
// 如果缓存容量值为0表示不限制本地缓存容量大小
.cacheSize(1000)
// 以下选项适用于断线原因造成了未收到本地缓存更新消息的情况。
// 断线重连的策略有以下几种:
// CLEAR - 如果断线一段时间以后则在重新建立连接以后清空本地缓存
// LOAD - 在服务端保存一份10分钟的作废日志
// 如果10分钟内重新建立连接,则按照作废日志内的记录清空本地缓存的元素
// 如果断线时间超过了这个时间,则将清空本地缓存中所有的内容
// NONE - 默认值。断线重连时不做处理。
.reconnectionStrategy(ReconnectionStrategy.NONE)
// 以下选项适用于不同本地缓存之间相互保持同步的情况
// 缓存同步策略有以下几种:
// INVALIDATE - 默认值。当本地缓存映射的某条元素发生变动时,同时驱逐所有相同本地缓存映射内的该元素
// UPDATE - 当本地缓存映射的某条元素发生变动时,同时更新所有相同本地缓存映射内的该元素
// NONE - 不做任何同步处理
.syncStrategy(SyncStrategy.INVALIDATE)
// 每个Map本地缓存里元素的有效时间,默认毫秒为单位
.timeToLive(10000)
// 或者
.timeToLive(10, TimeUnit.SECONDS)
// 每个Map本地缓存里元素的最长闲置时间,默认毫秒为单位
.maxIdle(10000)
// 或者
.maxIdle(10, TimeUnit.SECONDS);
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", options);
String prevObject = map.put("123", 1);
String currentObject = map.putIfAbsent("323", 2);
String obj = map.remove("123");
// 在不需要旧值的情况下可以使用fast为前缀的类似方法
map.fastPut("a", 1);
map.fastPutIfAbsent("d", 32);
map.fastRemove("b");
RFuture<String> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");
map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");
当不再使用Map本地缓存对象的时候应该手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。
RLocalCachedMap<String, Integer> map = ...
map.destroy();
(3)集群下使用
RClusteredMap<String, SomeObject> map = redisson.getClusteredMap("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");
map.fastPut("321", new SomeObject());
map.fastRemove("321");
(4)映射监听器
RMapCache<String, Integer> map = redisson.getMapCache("myMap");
// 或
RLocalCachedMapCache<String, Integer> map = redisson.getLocalCachedMapCache("myMap", options);
int updateListener = map.addListener(new EntryUpdatedListener<Integer, Integer>() {
@Override
public void onUpdated(EntryEvent<Integer, Integer> event) {
event.getKey(); // 字段名
event.getValue() // 新值
event.getOldValue() // 旧值
// ...
}
});
int createListener = map.addListener(new EntryCreatedListener<Integer, Integer>() {
@Override
public void onCreated(EntryEvent<Integer, Integer> event) {
event.getKey(); // 字段名
event.getValue() // 值
// ...
}
});
int expireListener = map.addListener(new EntryExpiredListener<Integer, Integer>() {
@Override
public void onExpired(EntryEvent<Integer, Integer> event) {
event.getKey(); // 字段名
event.getValue() // 值
// ...
}
});
int removeListener = map.addListener(new EntryRemovedListener<Integer, Integer>() {
@Override
public void onRemoved(EntryEvent<Integer, Integer> event) {
event.getKey(); // 字段名
event.getValue() // 值
// ...
}
});
map.removeListener(updateListener);
map.removeListener(createListener);
map.removeListener(expireListener);
map.removeListener(removeListener);
(5)LRU有界设定
分布式LRU有界映射允许通过对其中元素按使用时间排序处理的方式,主动移除超过规定容量限制的元素。
RMapCache<String, String> map = redisson.getMapCache("map");
// 尝试将该映射的最大容量限制设定为10
map.trySetMaxSize(10);
// 将该映射的最大容量限制设定或更改为10
map.setMaxSize(10);
map.put("1", "2");
map.put("3", "3", 1, TimeUnit.SECONDS);
Map中的一个字段值包含多个元素
(1)基于Set的Multimap
基于Set的Multimap不允许一个字段值包含有重复的元素。
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
Set<SimpleValue> allValues = map.get(new SimpleKey("0"));
List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
//替换值的集合,如果先前没有与该键关联的值,则为空集合
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
(2)基于List的MultipMap
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
List<SimpleValue> allValues = map.get(new SimpleKey("0"));
Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
保证了每个元素的唯一性
RSet<SomeObject> set = redisson.getSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());
集群模式
RClusteredSet<SomeObject> set = redisson.getClusteredSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());
(1)可重入锁 Reentrant Lock (RLock)
所谓重入锁,指的是以线程为单位,当一个线程获取对象锁之后,这个线程可以再次获取本对象上的锁,而其他的线程是不可以的
只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException
错误
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();
(2)指定加锁时间
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
(3)异步执行锁
RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
(4)公平锁
每个线程抢占锁的顺序为先后调用lock方法的顺序依次获取锁
RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();
(5)联锁 MultiLock
可以将多个RLock
对象关联为一个联锁
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();
(6)红锁RedissionRedLock
该对象也可以用来将多个RLock
对象关联为一个红锁
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
(7)读写锁RReadWriteLock
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
过期性信号量,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();