主要解决分布式环境下运行的多进程、多线程对共享资源的互斥访问控制,协调他们对共享资源的访问调度
- 互斥:任意时刻,只有一个线程能持有锁。
- 可重入:已经持有锁的线程,允许再次获取到锁
- 锁超时释放:持有锁超时,可以释放,防止死锁,和不必要的资源浪费
- 自动续约:防止业务逻辑未执行完,锁过期释放,自动延长锁过期时间。
- 阻塞与非阻塞:阻塞获取锁方式和非阻塞获取锁方式
- 安全性:无论发生什么情况,不能影响锁的一致性状态
- 高可用:锁的操作要高可用
- 高性能:锁的操作响应时间要短,服务端吞吐量要高
用来避免重复执行同样工作,比如:某一时刻的定时任务,执行一次就好;实现接口的幂等;等操作。如果只是用于该目的,对性能要求很高
某些逻辑存在并发问题,并发执行会使数据错乱,可以用分布式锁来实现同步执行,保证数据的正确
- 代码使用示例
//加锁逻辑
String lockKey = "lockKey";
String value = "uuid";
SetParams setParams = SetParams.setParams().nx().ex(5);
String result = jedis.set(lockKey, value, setParams);
if ("OK".equals(result)) {
System.out.println("获取锁成功");
}
// 释放锁逻辑
jedis.del("lockKey");
// 释放锁逻辑存在一定问题,直接将锁删除并没有校验,自己是否持有锁,正确做法是用lua脚本
List<String> keys = new ArrayList<String>();
keys.add("lockkey");
List<String> args = new ArrayList<String>();
args.add("uuid");
Long res = jedis.eval("if redis.call('get', KEYS[1])==ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end", keys, args);
if(res == 1) {
// 释放锁成功
}
- 互斥的实现:使用redis提供的setNX功能来实现互斥,setNX作用是如果key不存在设置key,如果存在设置失败,通过返回值进行区分。
- 锁超时释放实现:redis可以为key设置过期时间,过期后删除
- 安全性:redis并非强可靠性数据库,redis主从架构,主库宕机时,还未同步到从库的数据会存在丢失情况,安全性得不到保证
- 高可用:主从架构或集群架构,主库宕机,在从库切成主的这段时间不可用;
- 高性能:redis性能很高
- 性能高;
- 使用简单,代码复杂度低
- 功能不完善,如果自己实现存在较大复杂度;
- 安全性问题:redis并非强可靠性数据库,redis主从架构,主库宕机时,还未同步到从库的数据会存在丢失情况,安全性得不到保证;
- 高可用问题:主从架构或集群架构,主库宕机,在从库切成主的这段时间不可用;
- 实现简单,性能高,但功能支持太少,存在高可用,安全性问题,使用场景比较单一;
- 由于存在redis高可用,安全性问题,不适合用在数据正确性场景;
- redisson框架基于redis封装了分布式锁的一套完成操作,功能上得到了完善;
基于redis的hash结构和发布订阅功能实现,key是lockkey,key上设置过期时间实现锁过期释放,hash内存储field是线程的唯一标识,value存储锁持有次数;发布订阅用于实现阻塞方式获取锁。
- 代码使用示例
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redissonClient = Redisson.create(config);
RLock lock = redissonClient.getLock(lockKey);
// 获取锁
lock.lock();
//释放锁
lock.unlock();
- redis lua脚本介绍:lua是一个脚本语言,可以将一段lua脚本发送给redis执行,实现一些复杂的逻辑,并且脚本整体作为一个原子操作;redisson使用lua脚本实现了分布式锁的功能。
- 数据结构设计:
{
"lockkey":{
"clientId:threadId":"持有锁次数"
}
}
使用redis的hash结构,lockkey作为key,key上设置过期时间实现锁的过期;hash内field为 "clientId:threadId" 标识分布式环境线程的唯一标识,value代表了锁持有的次数。
- lua脚本分析,获取锁逻辑
// KEYS[1] = lockkey;
// ARGV[1] = leaseTime
// ARGV[2] = client:threadId
if (redis.call('exists', KEYS[1]) == 0) then //判断lockkey是否不存在
// 锁没被占用
redis.call('hincrby', KEYS[1], ARGV[2], 1); // 设置锁持有者 client:threadId,持有次数为1
redis.call('pexpire', KEYS[1], ARGV[1]); // 设置锁的过期时间
return nil;
end;
// 走到这里说明锁已经有持有者了
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then // 判断锁的持有者是否是自己
// 锁持有者是自己,再次获取锁,属于锁的重入
redis.call('hincrby', KEYS[1], ARGV[2], 1); // 重入,锁持有次数加一
redis.call('pexpire', KEYS[1], ARGV[1]); // 重新设置锁过期时间
return nil;
end;
// 走到这里说明锁已经被其它线程占有
return redis.call('pttl', KEYS[1]); // 返回锁剩余过期时间,用于阻塞等待
- lua脚本分析,释放锁逻辑
// KEYS[1] = lockkey;
// KEYS[2] = redisson_lock__channel:{lockkey};
// ARGV[1] = 0
// ARGV[2] = leaseTime
// ARGV[3] = client:threadId
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then // 验证释放锁的线程是否是锁的持有者
return nil; // 不是锁的持有者结束
end;
// 是锁的持有者
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); // 锁持有次数-1
if (counter > 0) then // 锁重入情况,存在counter>0
// 锁重入场景,释放锁将锁持有次数-1,但锁并未释放
redis.call('pexpire', KEYS[1], ARGV[2]); //修改锁过期时间
return 0;
else
// 进入这里说明锁持有次数变为0,进行释放操作
redis.call('del', KEYS[1]); // 将key删除,即是释放
redis.call('publish', KEYS[2], ARGV[1]); //广播通知,等待锁的客户端
return 1;
end;
return nil;
通过上述逻辑,已经分析了,互斥,可重入,锁超时释放,锁释放,阻塞唤醒的实现;
- 锁自动续约分析
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
if (ttlRemaining == null) { // ttlRemaining == null,获取锁成功,ttlRemaining锁的过期时间,获取锁失败时才会有值
if (leaseTime > 0) { // 对于明确指定了锁的过期时间,不做自动续约
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 开启定时任务做自动续约
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
// 关键自动续约逻辑
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 执行lua脚本修改锁过期时间,由于存在锁持有者一致性校验,所以使用lua脚本来做
// 续约脚本 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) { // res为true锁持有者还是自己
renewExpiration(); // 再次开启续约任务
} else {
cancelExpirationRenewal(null); // 持有者不是自己,取消续约任务
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 续也任务的间隔,时锁过期时间的三分之一,默认的续约续约间隔时10s
- 性能高
- 功能支持的全
- 安全性问题:redis并非强可靠性数据库,redis主从架构,主库宕机时,还未同步到从库的数据会存在丢失情况,安全性得不到保证
- 高可用问题:主从架构或集群架构,主库宕机,在从库切成主的这段时间不可用;
- 功能支持全,性能高,适合避免无效工作的场景;
- 由于存在redis高可用,安全性问题,不适合用在数据正确性场景;
- 为来解决redis高可用问题,安全性问题,设计出了redlock
- 说明redlock实现原理之前先分析下redis存在高可用,安全问题的本质原因,我总结为单点问题,主从架构主节点宕机,存在丢失数据,不能第一时间恢复;redlock解决这个问题的思想就是多节点,redlock串行将获取锁请求发送给多个节点,当有一半以上的节点返回成功,操作就成功了,如果超过一半失败时,获取锁成功的节点会做释放锁操作;如果是5个node,那么允许2个节点异常,高可用,数据丢失问题得到解决;但数据正确性问题并没有得到解决,后面我会具体分析;
- 代码使用示例
// 创建多个redis客户端,访问不同的redis节点
RedissonClient redissonClient;
RedissonClient redissonClient2;
RedissonClient redissonClient3;
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
redissonClient = Redisson.create(config);
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://127.0.0.2:6379");
redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://127.0.0.3:6379");
redissonClient3 = Redisson.create(config3);
String lockKey = "lockkey";
// 创建红锁,红锁本质是多个普通的锁的聚合
RedissonRedLock redLock = new RedissonRedLock(redissonClient.getLock(lockKey), redissonClient2.getLock(lockKey), redissonClient3.getLock(lockKey));
redLock.lock(); // 获取锁
redLock.unlock(); // 释放锁
- 获取锁
//RedissonRedLock 实现分析
// redlock 继承于 RedissonMultiLock(联锁),联锁内部聚合多个RLock
public class RedissonRedLock extends RedissonMultiLock {
// 多个普通锁,聚合成红锁
public RedissonRedLock(RLock... locks) {
super(locks);
}
// 允许失败的最大锁数量,locks.size()/2 + 1个锁成功,操作就成功
@Override
protected int failedLocksLimit() {
return locks.size() - minLocksAmount(locks);
}
protected int minLocksAmount(final List<RLock> locks) {
return locks.size()/2 + 1;
}
@Override
protected long calcLockWaitTime(long remainTime) {
return Math.max(remainTime / locks.size(), 1);
}
@Override
public void unlock() {
unlockInner(locks);
}
}
// 获取锁逻辑
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
if (waitTime == -1) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime)*2;
}
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
int failedLocksLimit = failedLocksLimit(); // 允许失败的个数,如果5个节点,该值为2
List<RLock> acquiredLocks = new ArrayList<>(locks.size()); // 获取锁成功的节点
// 遍历RLock,将获取锁的指令发送到所有节点
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
// 获取锁成功,加入到集合中
acquiredLocks.add(lock);
} else {
// 当前节点获取锁失败
// 节点总数量 - 成功的节点数量 == 允许失败的节点数量,逻辑为true,意味着成功的数据已经满足条件
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
// 超过一半的节点获取锁成功,整体获取锁成功,跳出循环
break;
}
if (failedLocksLimit == 0) {
// 失败的节点的数量已经达到限制,整体获取锁失败
// 释放锁,将之前所有获取成功的锁,全部释放
unlockInner(acquiredLocks);
if (waitTime == -1) {
//非阻塞模式,waitTime=-1 直接结束
return false;
}
failedLocksLimit = failedLocksLimit(); // 重置失败限制数量
acquiredLocks.clear(); //清空获取锁成功节点集合
// RLock集合遍历重置,这三步重置操作,这是要重新再来一遍的意思,直到成功否则不会停止
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--; // 允许失败次数减一
}
}
if (remainTime != -1) {
// 针对有等待时间的获取锁操作,等待时间耗完,进行锁释放
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
acquiredLocks.stream()
.map(l -> (RedissonLock) l)
.map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS))
.forEach(f -> f.toCompletableFuture().join());
}
return true;
}
- 释放锁
// 释放锁逻辑
public void unlock() {
List<RFuture<Void>> futures = new ArrayList<>(locks.size());
// 释放锁逻辑十分简单,循环所有的RLock逐个执行释放
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
}
for (RFuture<Void> future : futures) {
future.toCompletableFuture().join();
}
}
- 性能高:虽然节点变多,串行获取锁,接口响应时间变长,但基于redis实现性能依然还是很高的
- 高可用:允许n - (n/2+1)节点失败
- 功能支持的全
- 安全性得到提升,允许n - (n/2+1)节点宕机数据丢失,但在某些场景依然存在安全问题,在存在问题里面会详细说明;
-
需要资源变多:官方建议使用5个节点,5个主节点,如果每个主节点挂一个从节点,需要10个节点
-
存在安全性性问题:
场景1:假设有5个节点A、B、C、D、E,客户端1获取锁操作A、B、C节点成功,D、E失败,超过半数成功,客户端1获取锁成功;这时候节点C宕机,重启节点,但数据由于没有来得及持久化会丢失;紧接着进客户端2发起获取锁操作,C、D、E都会加锁成功,客户端2也获取锁成功,两个客户端同时获取到锁;针对这种场景解决方案,可以延迟重启节点,可以等到锁过期释放后再重启,问题是这个时间具体是多久,不好评估;
场景2:时钟跳跃问题,假设有5个节点A、B、C、D、E,客户端1获取锁操作A、B、C节点成功,D、E失败,超过半数成功,客户端1获取锁成功;节点C发生时钟跳跃,导致提前过期释放;客户端2发起获取锁操作,C、D、E都会加锁成功; 存在数据;
场景3:客户端长时间阻塞,导致服务端锁过期释放释放,被其它客户端拿到锁;上面三种场景总结为同时两个客户端拿到锁;针对这种场景,大佬给出的解决方案,见下图;使用乐观锁的思想,引入token,token是一个全局递增的序列号,当获取锁成功后可以拿到token,在操作数据时和数据的token进行比较,token大的允许操作。redisson并没有提供这方面的功能,难点在于所有节点保证token的一致,顺序递增,这需要强一致性算法。详情参考:https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
- 在redisson单节点基础上,使用多节点来解决高可用和安全性问题(并未彻底解决);
- redlock性能依然还是高的,如果需要高可用的,高并发的避免无效工作的场景,非常适合;
- 在数据正确性场景我不建议使用,在上述的问题场景已经分析过,但如果你认为那些问题场景出现概率极低,在控制得当情况下,可以避免,或就算出现了数据正确性问题,你依然有别的方式修复,那redlock还是很适合的。
- 要想解决数据正确性问题,需要使用强一致性算法paxos类型架构实现的分布式锁,比如zookeeper
zookeeper是基于强一致算法zab协议实现了一套类似文件系统的功能;zookeeper的节点类型有一种临时顺序节点,临时是指当客户端与服务端连接断开,创建的节点会自动删除,顺序是指当创建同一个名称的节点时,节点名加上一个序号作为节点名称,序号按照节点创建顺序递增;zookeeper的watch功能用来监听节点的变化的,当节点发生变化时,会通知客户端;获取锁操作会在zookeeper创建临时顺序节点,序号最小的获取到锁,为获取到锁的client的进入阻塞等待并监听自己节点的前一个节点,节点释放时会通知client获取锁。
-
获取锁分析
client发送获取锁请求,在zookeeper上创建一个临时顺序节点,然后获取目录下所有的节点列表,比较自己的节点的序号是否是最小的,如果是获取锁成功,否获取锁失败,获取锁失败后watch自己节点的前一个节点,当持有锁的节点释放后,会通知下一个节点的client;下面分析一段curator框架的实现
// millisToWait:最大的等待时间;ourPath:自己节点的path,创建逻辑就不展示了
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false; //标识当前节点是否持有锁
boolean doDelete = false; // 到达最大等待时间
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
// client端状态是STARTED,并且没有获取到锁,进入while
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren(); // 查询锁目录下所有节点并排序
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // 获取当前节点的序号
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); // 获取锁,判断是否符合获取锁的条件,后面分析
if ( predicateResults.getsTheLock() )
{
// 获取锁成功
haveTheLock = true;
}
else
{
// 获取锁失败
// 得到当前节点的前一个节点,要watch前一个节点
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// watch 前一个节点,后面分析watcher逻辑,会唤醒wait
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null ) // 存在等待时间
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
// 等待时间耗尽,记录删除标记,后面删除临时节点
doDelete = true;
break;
}
wait(millisToWait); // 阻塞等待有等待时间
}
else
{
wait(); // 阻塞等待
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
// 过期扔没有获取到锁,删除
deleteOurPath(ourPath);
}
}
return haveTheLock; // 返回获取锁的状态
}
// children:锁目录下所有的节点序号排序结果;sequenceNodeName:自己节点的序号;maxLeases:值为1
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
int ourIndex = children.indexOf(sequenceNodeName); // 获取自己的节点在列表中的位置
validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = ourIndex < maxLeases; // 节点列表排序后序号最小的一定是位置为0的,自己节点位置为0获取锁成功
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); // 获取锁失败找到前一节点
return new PredicateResults(pathToWatch, getsTheLock);
}
// 当监听的节点删除时会触发watch
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
notifyFromWatcher();
}
};
private synchronized void notifyFromWatcher()
{
notifyAll(); // 唤醒wait等待的线程
}
- 释放锁分析
释放锁逻辑非常简单,只需将节点删除,下面看下curator框架的实现
private void deleteOurPath(String ourPath) throws Exception
{
try
{
client.delete().guaranteed().forPath(ourPath); // 删除节点
}
catch ( KeeperException.NoNodeException e )
{
}
}
- 锁重入分析
锁重入的实现是在客户端做的,使用一个map key是线程对象,值是锁持有次数,重入时持有次数加一,下面看下curator框架的实现
// 锁数据
private static class LockData
{
final Thread owningThread; // 线程
final String lockPath; // 节点路径
final AtomicInteger lockCount = new AtomicInteger(1); // 锁持有次数
private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
// 获取锁
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
Thread currentThread = Thread.currentThread(); // 当前线程
// threadData是ConcurrentMap,获取当前线程 LockData
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// 已经持有锁,锁测重入
// 锁持有次数加一
lockData.lockCount.incrementAndGet();
return true;
}
// 没有持有锁
// 获取锁,核心逻辑上面分析过
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
// 获取锁成功设置 LockData
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
// 获取锁失败
return false;
}
- 锁过期释放,锁自动续约实现 利用zookeeper自带的功能,客户端与服务端会维护一个session,客户端会定时发送心跳,续约session过期时间,如果服务端检测到session过期,临时顺序节点便会删除即释放锁
- 高可用 zookeeper集群的高可用允许 n - (n/2+1)个节点宕机,但如果leader节点宕机,集群在选举出leader之前是不可用的
- 安全性 zookeeper使用强一致算法zab,节点间的数据都是一致的; 在redlock章节最后讨论中,redlock在两个客户端同时获取到锁的场景存在数据安全问题,那么在zookeeper中能否解决呢,如果只是使用我上面介绍的方式,答案是不行,要想彻底解决,LockData里需要加一个zxid,每创建一个临时节点都会生成一个zxid,使用zxid实现乐观锁,可以解决这个问题的。
安全性,数据正确性高
- 性能太差,zookeeper不适合写多的场景
- 功能提供不完善,需要自己实现
- 高可用一般,虽然允许 n - (n/2+1)个节点宕机,但如果leader节点宕机,恢复期间是不可用的
在低并发,数据正确性要求高的场景,可以使用;在并发高,高可用场景不适合
wlock基于wpaxos实现了一套分布式锁,wpaxos是强一致算法Multi-Paxos的一套实现,也是58开源项目;是专门针对分布式锁设计的,具有高性能,高可用,高可靠特性;client获取锁时校验锁是否空闲,空闲时写入锁数据,其它client放入watch队列,等待锁释放唤醒,唤醒后获取到锁再通知client端。
-
整体架构
可以看到三个角色,客户端,服务端集群,注册中心;先从最核心的服务端集群介绍,集群由多个node组成,node内由15个独立的group组成,相同编号的group node构成了一个paxos集群,服务端集群也可以说由15个paxos集群组成;paxos集群要选出master节点,15个paxos集群就有15个master,master会均匀分布到node上;master分布信息会上报到注册中心;注册中心将master分布信息发送给客户端,客户端拿到master节点建立连接,发送获取锁请求,请求会根据lockkey做一个路由,来确定要发送的master node;
-
高性能设计
client端根据lockkey路由到不同的master group node上,group内部由多线程进行处理,锁请求根据lockkey路由到不同线程,多线程并发生成propose request放入队列,将队列里的propose合并成batch propose发送给从节点,锁的数据最终存入高性能key value型本地数据库RocksDB。 高性能设计总结 并行处理:多分组,多线程架构;每个分组组成了一个独立运行的paxos集群,存在自己的master,能独立提供服务;每个分组内有多个处理线程,可以并行处理请求,最大程度上利用资源; 充分利用所有node资源:多分组架构,每个分组组成了一个独立运行的paxos集群,存在自己的master,多个master均匀分布到所有的node上,请求由master进行处理,所有的node都参与了请求的处理,node资源都得到了利用; 负载均衡:使用lockkey路由策略,锁请求被路由到不同的master 分组节点上,实现流量的均衡 提高网络传输效率:propose请求合并为Batch propose,批量量发送;实现合并批量发送并不难,难在合并多线程,平行执行下,如果保证同一lockkey请求的按顺序合并,执行;方案:保证同一个lockkey请求路由到同一个线程队列中,利用单线程的顺序执行,来保证顺序; 合适的锁数据存储:使用高性能key value本地数据库rocksdb,lockkey作为key,锁信息作为value,简单高效。
-
获取锁分析
上图是获取锁流程,将分别介绍关键点,来分析wlock的实现 rocksdb:高效的key value数据库,负责存储锁信息,当client端获取到锁会存入锁信息,锁信息数据格式如下
// 锁信息
public class ReentrantLockValue {
//客户端协议版本
private byte version;
//最新锁版本号,本质是paxos增值的instanId,类比zookeeper的zxid
private long lockVersion;
//锁状态 目前没用到
private int status;
//锁类型 目前支持分布式锁和读写锁两种
private int lockType;
/**
* 锁持有者
* 读写锁情况下为写锁的持有者
*/
private LockOwnerInfo lockOwnerInfo;
//读锁数量
private int readOwnerCount;
//读锁持有者 key: ip+pid+threadId
private Map<String, LockOwnerInfo> rLockOwnerInfos = new HashMap<String, LockOwnerInfo>();
}
// 锁持有者信息
public class LockOwnerInfo {
private int ip;
private long threadId;
private int pid;
//锁过期时间
private long expireTime;
//锁版本号
private long lockVersion;
}
状态机:master发起的propose最终会形成顺序的paxos log,线性执行状态机,主从节点执行顺序一致,锁状态机逻辑如下
// LockStateMachine
// groupIdx: 分组; instanceID: paxos递增的ID每发起一个propose就会+1
// paxosValue: 锁的请求信息; smCtx: 上下文信息状态机执行结果可以放入,发起propose位置可以拿到
public boolean execute(int groupIdx, long instanceID, byte[] paxosValue, SMCtx smCtx) {
if(!MigrateService.getInstance().isSyncToMigratePoint(groupIdx)) {
LOGGER.error("LockStateMachine not Sync To Migrate Point. groupId: {} instanceId: {}", groupIdx, instanceID);
return false;
}
long start = System.currentTimeMillis();
byte protocolType = paxosValue[BaseLockDO.PROYOCOL_TYPE_OFFSET];
LockSmCtx lockSmCtx = null;
if (smCtx != null && smCtx.getpCtx() != null) {
lockSmCtx = (LockSmCtx) smCtx.getpCtx();
}
if (lockSmCtx != null) {
lockSmCtx.setLockRet(LockResult.SUCCESS);
}
boolean result = false;
if (ProtocolType.ACQUIRE_LOCK == protocolType) { // 获取锁操作
AcquireLockDO acquireLockDO;
acquireLockDO = AcquireLockDO.fromBytes(paxosValue);
// 分组的版本,做分组迁移的时候会+1,默认是0
long groupVersion = GroupMetaService.getInstance().getGroupVersion(groupIdx);
// 计算锁的版本号
long lockVersion = (groupVersion << 48) + instanceID;
// 执行获取锁逻辑,下面分析
result = reentrantLock(acquireLockDO.getLockType()).tryAcquireLock(acquireLockDO, lockVersion, groupIdx, lockSmCtx);
} else if (ProtocolType.RENEW_LOCK == protocolType) { // 续约锁操作
RenewLockDO renewLockDO;
renewLockDO = RenewLockDO.fromBytes(paxosValue);
result = reentrantLock(renewLockDO.getLockType()).renewLock(renewLockDO, groupIdx, lockSmCtx);
} else if (ProtocolType.RELEASE_LOCK == protocolType) { // 释放锁操作
ReleaseLockDO releaseLockDO;
releaseLockDO = ReleaseLockDO.fromBytes(paxosValue);
result = reentrantLock(releaseLockDO.getLockType()).releaseLock(releaseLockDO, groupIdx, lockSmCtx);
} else if (ProtocolType.DELETE_LOCK == protocolType) { // 删除锁操作
DeleteLockDO deleteLockDO;
deleteLockDO = DeleteLockDO.fromBytes(paxosValue);
result = reentrantLock(deleteLockDO.getLockType()).deleteLock(deleteLockDO, groupIdx, lockSmCtx);
} else if (lockSmCtx != null) {
LOGGER.error("unknow protocol {}", protocolType);
lockSmCtx.setLockRet(LockResult.PROTOCOL_TYPE_ERROR);
}
checkpointManager.executeForCheckpoint(groupIdx, super.getSMID(), instanceID, paxosValue); // 保存到checkpoint 某instanceId开始的一个快照
long cost = System.currentTimeMillis()-start;
if(cost>200){
LOGGER.error("lock sm groupid {} execute cost {}",groupIdx,cost);
}
return result;
}
// ReentrantLock
// 获取锁逻辑,整个过程逻辑很简单,很多校验都前置了,状态机执行的阶段是propose后,比较消耗资源,所以只有真正具备操作条件的请求才会发送propose
public boolean tryAcquireLock(AcquireLockDO acquireLockDO, long instanceID, int groupIdx, LockSmCtx smCtx) {
String key = acquireLockDO.getLockKey();
// 创建锁信息
ReentrantLockValue lockvalue = createLockvalue(acquireLockDO, instanceID);
try {
Optional<ReentrantLockValue> lock = lockRepositroy.getLock(key, groupIdx);
if (lock.isPresent() && lock.get().getLockVersion() > acquireLockDO.getFencingToken()) {
// 锁存在校验锁的版本号
if (smCtx != null) {
smCtx.setLockRet(LockResult.TOKEN_ERROR);
}
return true;
}
// 锁信息存入了rocksdb
lockRepositroy.lock(key, lockvalue, groupIdx);
if (smCtx != null) {
smCtx.setFencingToken(instanceID);
smCtx.setExpireTime(acquireLockDO.getExpireTime());
}
return true;
} catch (LockException e) {
LOGGER.error("groupid {} {} try lock key : {} error.", groupIdx, acquireLockDO.getHost(), key);
LOGGER.error(e.getMessage(), e);
if (smCtx != null) {
smCtx.setLockRet(LockResult.EXCEPTION);
}
return false;
}
}
校验是否具备获取锁条件:锁没被占用,锁过期了, 锁重入,优先级最高,下面结合代码分析
// ReentrantLockService
// 获取锁逻辑
public boolean tryAcquireLock(LockContext lockContext, int groupId) {
AcquireLockRequest acquireLockRequest = new AcquireLockRequest();
try {
acquireLockRequest.fromBytes(lockContext.getBuf());
} catch (ProtocolException e) {
LOGGER.error("try acquire lock error", e);
ackAcquireLock(lockContext.getChannel(), acquireLockRequest, null, ResponseStatus.ERROR);
return false;
}
long version = 0L;
String key = acquireLockRequest.getLockKey();
Optional<ReentrantLockValue> lock = null;
try {
lock = lockRepository.getLock(key, groupId); // 从rocksdb中获取锁信息
} catch (LockException e) {
LOGGER.error("{} acquire lock key : {} error.", acquireLockRequest.getHost(), key, e);
ackAcquireLock(lockContext.getChannel(), acquireLockRequest, null, ResponseStatus.ERROR);
return false;
}
ReentrantLockValue reentrantLockValue;
if (lock.isPresent()) { //判断锁是否被占用
reentrantLockValue = lock.get();
LockOwnerInfo lockOwnerInfo = reentrantLockValue.getLockOwnerInfo(); // 持有者信息
version = reentrantLockValue.getLockVersion(); // 锁的版本号
if (!lockOwnerInfo.isExpire()) { // 判断锁是否过期
// 判断是非是锁重入情况
if (lock.get().getLockOwnerInfo().equals(acquireLockRequest.getHost(), acquireLockRequest.getThreadID(), acquireLockRequest.getPid())) {
AcquireLockDO acquireLockDO = AcquireLockDO.fromRequest(acquireLockRequest, version);
SMCtx ctx = createCtx();
byte[] proposeMsg;
try {
proposeMsg = acquireLockDO.toBytes();
} catch (ProtocolException e) {
LOGGER.error("try acquire lock error", e);
ackAcquireLock(lockContext.getChannel(), acquireLockRequest, null, ResponseStatus.ERROR);
return false;
}
// 重入场景允许获取锁,发送propose到从节点,会执行获取的状态机,是线性执行的,所有节点都会执行,上面分析了锁状态机逻辑
ProposeResult propose = paxosService.batchPropose(proposeMsg, groupId, ctx, acquireLockRequest.getRegistryKey());
LockSmCtx lockSmCtx = (LockSmCtx) ctx.getpCtx();
if (propose.getResult() == PaxosTryCommitRet.PaxosTryCommitRet_OK.getRet() && lockSmCtx.getLockRet() == LockResult.SUCCESS) {
LOGGER.debug("ip :{} pid {} threadid {} acquire lock key {} version {} groupid {}, acquire lock success ", acquireLockDO.getHost(), acquireLockDO.getPid(), acquireLockDO.getThreadID(), key, lockSmCtx.getFencingToken(), groupId);
LockOwner lockOwner = new LockOwner(acquireLockRequest.getHost(), acquireLockRequest.getThreadID(), acquireLockRequest.getPid(), lockSmCtx.getFencingToken());
LockClient lockClient = ClientManager.getInstance().createLockClient(key, groupId, lockContext.getChannel(), acquireLockRequest);
ClientManager.getInstance().addLockClient(key, lockClient, groupId, lockContext.getChannel());
// 回复客户端获取锁成功了
ackAcquireLock(lockContext.getChannel(), acquireLockRequest, lockOwner, ResponseStatus.SUCCESS);
// 加入一个锁过期检测事件,会检测锁过期,触发释放
expireStrategyFactory.addExpireEvent(new LockExpireEvent(lockSmCtx.getExpireTime(), acquireLockDO.getLockKey(),
groupId, lockSmCtx.getFencingToken(), acquireLockDO.getLockType(), acquireLockDO.getOpcode(), acquireLockDO.getHost(), acquireLockDO.getThreadID(), acquireLockDO.getPid()));
traceWorker.offer(new LockTrace(TimeUtil.getCurrentTimestamp(), ProtocolType.ACQUIRE_LOCK, IPUtil.getIpStr(acquireLockDO.getHost()), acquireLockDO.getThreadID(),
acquireLockRequest.getPid(), acquireLockRequest.getLockKey(), lockSmCtx.getFencingToken(), acquireLockRequest.getRegistryKey(), lockSmCtx.getExpireTime(), LockCodeEnum.Reentrant_Lock));
return true;
}
ackAcquireLock(lockContext.getChannel(), acquireLockRequest, null, ResponseStatus.ERROR);
return false;
}
if (acquireLockRequest.isBlocked()) {
LOGGER.debug("ip :{} pid {} threadid {} acquire lock key {} groupid {}, lock exist wait.", acquireLockRequest.getHost(), acquireLockRequest.getPid(), acquireLockRequest.getThreadID(), key, groupId);
LockClient lockClient = ClientManager.getInstance().createLockClient(key, groupId, lockContext.getChannel(), acquireLockRequest);
WatchEvent watchEvent = watchService.genWatchEvent(acquireLockRequest, lockClient, reentrantLockValue.getLockVersion());
// 阻塞模式获取锁,放入watch队列,等待锁释放唤醒
watchService.addWatchEvent(key, watchEvent, groupId);
ClientManager.getInstance().addLockClient(key,lockClient,groupId,lockContext.getChannel());
ackAcquireLock(lockContext.getChannel(), acquireLockRequest, null, ResponseStatus.LOCK_WAIT);
} else {
LOGGER.debug("ip :{} pid {} threadid {} acquire get lock key {} groupid {}, lock exist return occupied.", acquireLockRequest.getHost(), acquireLockRequest.getPid(), acquireLockRequest.getThreadID(), key, groupId);
ackAcquireLock(lockContext.getChannel(), acquireLockRequest, null, ResponseStatus.LOCK_OCCUPIED);
}
return false;
}
LOGGER.debug("ip :{} pid {} threadid {} acquire lock key {} version {} groupid {}, lock is expire ,propose delete key.", acquireLockRequest.getHost(), acquireLockRequest.getPid(), acquireLockRequest.getThreadID(), key, reentrantLockValue.getLockVersion(), groupId);
proposeDeleteKey(acquireLockRequest, groupId, lockOwnerInfo);
lockNotify.lockNotifyExpired(key, new LockOwner(lockOwnerInfo.getIp(), lockOwnerInfo.getThreadId(), lockOwnerInfo.getPid()), groupId);
}
// 走到这里,两种情况锁没被占用或者锁过期了
// 权重比较,选择优先级高的client允许获取锁
AcquireLockDO acquireLockDO = null;
boolean isFirstWatch;
WatchEvent watchEvent = watchService.fetchFirstAcquiredWatchEvent(key, groupId);
if (watchEvent != null && watchEvent.getWeight() >= acquireLockRequest.getWeight()) {
acquireLockDO = AcquireLockDO.fromWatchEvent(key, watchEvent, version);
isFirstWatch = true;
LOGGER.info("acquire lock by watch events : {}.", watchEvent.getLockClient());
} else {
acquireLockDO = AcquireLockDO.fromRequest(acquireLockRequest, version);
isFirstWatch = false;
}
SMCtx ctx = createCtx();
byte[] proposeMsg;
try {
proposeMsg = acquireLockDO.toBytes();
} catch (ProtocolException e) {
LOGGER.error("try acquire lock error", e);
ackAcquireLock(lockContext.getChannel(), acquireLockRequest, null, ResponseStatus.ERROR);
return false;
}
ProposeResult propose = paxosService.batchPropose(proposeMsg, groupId, ctx, acquireLockRequest.getRegistryKey());
LockSmCtx lockSmCtx = (LockSmCtx) ctx.getpCtx();
if (propose.getResult() == PaxosTryCommitRet.PaxosTryCommitRet_OK.getRet() && lockSmCtx.getLockRet() == LockResult.SUCCESS) {
if (isFirstWatch) {
LockClient lockClient = watchEvent.getLockClient();
LockOwner lockOwner = new LockOwner(lockClient.getcHost(), lockClient.getcThreadID(), lockClient.getcPid(), lockSmCtx.getFencingToken());
lockNotify.lockNotifyUpdate(key, lockOwner, groupId);
watchService.removeWatchEvent(key,groupId,watchEvent);
if (acquireLockRequest.isBlocked()) {
LockClient newLockClient = ClientManager.getInstance().createLockClient(key, groupId, lockContext.getChannel(), acquireLockRequest);
WatchEvent newWatchEvent = watchService.genWatchEvent(acquireLockRequest, newLockClient, lockSmCtx.getFencingToken());
// 阻塞模式获取锁,放入watch队列,等待锁释放唤醒
watchService.addWatchEvent(key, newWatchEvent, groupId);
ClientManager.getInstance().addLockClient(key, newLockClient, groupId, lockContext.getChannel());
LOGGER.debug("ip :{} pid {} threadid {} acquire lock key {} groupid {}, lock wait", acquireLockRequest.getHost(), acquireLockRequest.getPid(), acquireLockRequest.getThreadID(), key, groupId);
ackAcquireLock(lockContext.getChannel(), acquireLockRequest, null, ResponseStatus.LOCK_WAIT);
} else {
LOGGER.debug("ip :{} pid {} threadid {} acquire lock key {} groupid {}, lock occupied", acquireLockRequest.getHost(), acquireLockRequest.getPid(), acquireLockRequest.getThreadID(), key, groupId);
ackAcquireLock(lockContext.getChannel(), acquireLockRequest, null, ResponseStatus.LOCK_OCCUPIED);
}
traceWorker.offer(new LockTrace(TimeUtil.getCurrentTimestamp(), ProtocolType.ACQUIRE_LOCK, IPUtil.getIpStr(lockClient.getcHost()), lockClient.getcThreadID(),
lockClient.getcPid(), acquireLockRequest.getLockKey(), lockSmCtx.getFencingToken(), acquireLockRequest.getRegistryKey(), lockSmCtx.getExpireTime(), LockCodeEnum.Reentrant_Lock));
} else {
LOGGER.debug("ip :{} pid {} threadid {} acquire lock key {} version {} groupid {}, acquire lock success ", acquireLockDO.getHost(), acquireLockDO.getPid(), acquireLockDO.getThreadID(), key, lockSmCtx.getFencingToken(), groupId);
LockOwner lockOwner = new LockOwner(acquireLockRequest.getHost(), acquireLockRequest.getThreadID(), acquireLockRequest.getPid(), lockSmCtx.getFencingToken());
lockNotify.lockNotifyUpdate(key, lockOwner, groupId);
LockClient lockClient = ClientManager.getInstance().createLockClient(key, groupId, lockContext.getChannel(), acquireLockRequest);
ClientManager.getInstance().addLockClient(key, lockClient, groupId, lockContext.getChannel());
ackAcquireLock(lockContext.getChannel(), acquireLockRequest, lockOwner, ResponseStatus.SUCCESS);
traceWorker.offer(new LockTrace(TimeUtil.getCurrentTimestamp(), ProtocolType.ACQUIRE_LOCK, IPUtil.getIpStr(acquireLockDO.getHost()), acquireLockDO.getThreadID(),
acquireLockRequest.getPid(), acquireLockRequest.getLockKey(), lockSmCtx.getFencingToken(), acquireLockRequest.getRegistryKey(), lockSmCtx.getExpireTime(), LockCodeEnum.Reentrant_Lock));
}
// 加入一个锁过期检测事件,会检测锁过期,触发释放
expireStrategyFactory.addExpireEvent(new LockExpireEvent(lockSmCtx.getExpireTime(), acquireLockDO.getLockKey(),
groupId, lockSmCtx.getFencingToken(), acquireLockDO.getLockType(), acquireLockDO.getOpcode(), acquireLockDO.getHost(), acquireLockDO.getThreadID(), acquireLockDO.getPid()));
return true;
}
LOGGER.error("ip :{} pid {} threadid {} acquire lock key {} groupid {}, acquire lock failed ", acquireLockRequest.getHost(), acquireLockRequest.getPid(), acquireLockRequest.getThreadID(), key, groupId);
ackAcquireLock(lockContext.getChannel(), acquireLockRequest, null, ResponseStatus.ERROR);
return false;
}
客户端获取锁逻辑
// LockService
// lockOption:锁的操作,下面有说明
public AcquireLockResult tryAcquireLock(String lockkey, InternalLockOption lockOption) throws ParameterIllegalException {
AcquireLockResult result = new AcquireLockResult();
result.setRet(false);
long startTimestamp = System.currentTimeMillis();
lockParameterCheck(lockOption);
// 获取锁后本地会记录锁信息,锁重入情况会返回true
if (this.lockManager.acquiredLockLocal(lockkey, lockOption)) {
/*本地获取成功锁*/
LockContext lockContext = lockManager.getLocalLockContext(lockkey, lockOption.getThreadID(), lockOption.getLockType(), lockOption.getLockOpcode());
// 续约锁,重新修改下服务端锁过期时间
LockResult lockResult = renewLock(lockkey, lockContext.getLockVersion(), lockOption.getExpireTime(), lockOption.getThreadID(), lockOption.getLockType(), lockOption.getLockOpcode());
if (!lockResult.isSuccess()) {
this.lockManager.releaseLockLocal(lockkey, lockOption.getThreadID(), false, lockOption.getLockType(), lockOption.getLockOpcode());
}
result.setOwner(new LockOwner(InetAddressUtil.getIpInt(),lockOption.getThreadID(),lockOption.getPID()));
result.setResponseStatus(lockResult.getResponseStatus());
result.setRet(lockResult.isSuccess());
result.setLockVersion(lockContext.getLockVersion());
return result;
}
int timeout = (int) Math.min(this.wlockClient.getDefaultTimeoutForReq(), lockOption.getMaxWaitTime());
WatchEvent watchEvent = null;
// 判断是非是阻塞方式获取锁
if (lockOption.isWaitAcquire()) {
watchEvent = new WatchEvent(lockkey, lockOption.getThreadID(),
lockOption.getWatchID(), WatchType.ACQUIRE, startTimestamp);
watchEvent.setLockOption(lockOption);
watchEvent.setTimeout(lockOption.getMaxWaitTime());
// 在本地注册个WatchEvent
this.watchManager.registerWatchEvent(lockkey, watchEvent);
}
// 根据lockkey计算路由到的group
int groupId = this.wlockClient.getRegistryKey().getGroupId(lockkey);
AcquireLockRequest acquireLockReq = protocolFactory.createAcquireReq(lockkey, groupId, lockOption);
try {
// 发送获取锁请求到服务端
SendReqResult sendReqResult = this.serverPoolHandler.syncSendRequest(acquireLockReq, timeout, "tryAcquireLock " + lockkey);
if (sendReqResult != null) {
AcquireLockResponse resp = new AcquireLockResponse();
resp.fromBytes(sendReqResult.getData());
result.setResponseStatus(resp.getStatus());
if (resp.getStatus() == ResponseStatus.LOCK_WAIT) {
logger.debug(Version.INFO + ", tryAcquireLock status : " + ResponseStatus.toStr(resp.getStatus()) + ", lockkey : " + lockkey + ", threadID : " + lockOption.getThreadID());
// 服务端锁已经被占用,当前client进入阻塞等待状态,等待服务端的通知唤醒
NotifyEvent notifyEvent = this.watchManager.waitNotifyEvent(lockOption.getWatchID(), lockOption.getMaxWaitTime());
if (notifyEvent != null && notifyEvent.getEventType() == EventType.LOCK_ACQUIRED.getType()) {
// 获取锁成功,将锁信息放入到本地
this.lockManager.updateLockLocal(lockkey, notifyEvent.getFencingToken(), lockOption, true);
EventCachedHandler.getInstance(wlockClient).unRegisterWatchEvent(lockkey, notifyEvent.getWatchID());
AcquireEvent acquireEvent = new AcquireEvent(lockkey, resp.getFencingToken(), lockOption, lockOption.getThreadID());
EventCachedHandler.getInstance(wlockClient).registerAcquireEvent(acquireEvent);
result.setRet(true);
// 锁的版本,解决数据正确性问题时会用到
result.setLockVersion(notifyEvent.getFencingToken());
result.setOwner(new LockOwner(acquireLockReq.getHost(), acquireLockReq.getThreadID(), acquireLockReq.getPid()));
result.setResponseStatus(ResponseStatus.SUCCESS);
} else {
result.setRet(false);
logger.error(Version.INFO + ", tryAcquireLock blocked, timeout , lockkey : " + lockkey + ", timeout : " + lockOption.getMaxWaitTime());
}
return result;
} else if (resp.getStatus() == ResponseStatus.SUCCESS) {
// 获取锁成功,将锁信息放入到本地
this.lockManager.updateLockLocal(lockkey, resp.getFencingToken(), lockOption, false);
result.setRet(true);
// 锁的版本,解决数据正确性问题时会用到
result.setLockVersion(resp.getFencingToken());
result.setOwner(new LockOwner(resp.getOwnerHost(), resp.getThreadID(), resp.getPid()));
AcquireEvent acquireEvent = new AcquireEvent(lockkey, resp.getFencingToken(), lockOption, lockOption.getThreadID());
EventCachedHandler.getInstance(wlockClient).registerAcquireEvent(acquireEvent);
return result;
} else if (resp.getStatus() == ResponseStatus.TIMEOUT) {
result.setRet(false);
logger.error(Version.INFO + ", tryAcquireLock status : " + ResponseStatus.toStr(resp.getStatus()) + ", lockkey : " + lockkey + ", server : " + sendReqResult.getServer() + ", timeout : " + timeout);
} else {
result.setRet(false);
if (resp.getStatus() == ResponseStatus.LOCK_OCCUPIED) {
logger.debug(Version.INFO + ", tryAcquireLock status : " + ResponseStatus.toStr(resp.getStatus()) + ", lockkey : " + lockkey);
} else {
logger.info(Version.INFO + ", tryAcquireLock status : " + ResponseStatus.toStr(resp.getStatus()) + ", lockkey : " + lockkey);
}
return result;
}
}
} catch (Exception e) {
logger.error(Version.INFO + ", tryAcquireLock error.", e);
} finally {
// 取消watchEvnet
this.watchManager.unRegisterWatchEvent(lockkey, lockOption.getWatchID());
}
return result;
}
public class InternalLockOption {
/**
* 0:可重入锁 1:读写锁 , 默认可重入锁
*/
protected byte lockType = 0;
/**
* 0为null,读写锁操作码:写锁 1, 读锁 2 , 默认 0
*/
protected byte lockOpcode = 0;
/**
* watch 策略 : 一次 watch 还是持续 watch , 默认是 1 次
*/
protected WatchPolicy watchPolicy = WatchPolicy.Once;
/**
* 是否阻塞等待获取到锁
*/
protected boolean waitAcquire;
/**
* 锁过期时间,单位毫秒,默认值为5分钟,最大取值5分钟,最小值5秒
*/
protected int expireTime = Factor.LOCK_MAX_EXPIRETIME;
/**
* 锁权重,默认都为1,取值范围[1, 10],权重越高,获取到锁概率越高
*/
protected int weight = Factor.ACQUIRE_LOCK_MIN_WEIGHT;
/**
* 最长阻塞等待时间,默认值为Long.MAX_VALUE
*/
protected long maxWaitTime = Factor.WATCH_MAX_WAIT_TIME_MARK;
/**
* 自动续租间隔,单位毫秒(默认为Integer.MAX_VALUE,不自动续租,最小自动续租间隔为1000ms,最大自动续租间隔不能超过过期时间,由业务控制)
*/
protected int renewInterval = Factor.LOCK_NOT_RENEWINTERVAL;
/**
* 续约Listener回调
*/
protected RenewListener renewListener;
/**
* 锁过期Listener回调
*/
protected LockExpireListener lockExpireListener;
/**
* 监听事件回调
*/
protected WatchListener watchListener;
/**
* 长期持有锁回调
*/
protected HoldLockListener holdLockListener;
/**
* 真实的过期时间
*/
protected int realExpireMills;
/**
* 真实过期 时间戳
*/
protected long realExpireTimeStamp;
/**
* 锁版本号
*/
private long lockversion;
/**
* watch event ID
*/
private long watchID;
/**
* 发起请求的线程ID
*/
private long threadID;
/**
* 当前进程ID
*/
private int PID;
/**
* 秘钥hash key
*/
private String registryKey;
/**
* 自动重试接口调用是否允许
*/
private boolean autoRenewEnabled;
}
- 锁重入分析 锁重入客户端实现,获取锁成功后会在本地存储锁信息
// LockManager
// 锁重入逻辑
public boolean acquiredLockLocal(String lockkey, InternalLockOption lockOption) {
// 锁信息放入map中,key使用lockkey:threadId 拼接而成
String contextKey = lockkey + lockkey_threadID_seperator + lockOption.getThreadID();
// 获取本地锁
LockContext lockContext = lockContextMap(lockOption.getLockType(), lockOption.getLockOpcode()).get(contextKey);
// 为true,已从远程获取到锁
if (lockContext != null && lockContext.getAquiredCount() > 0) {
// 锁持有次数+1
lockContext.incrAquiredCount();
return true;
}
// 本地获取失败,必须远程获取
return false;
}
- 释放锁分析 状态机:释放锁,将rocksdb中的锁信息删除
// ReentrantLock
// 释放锁
public boolean releaseLock(ReleaseLockDO releaseLockDO, int groupIdx, LockSmCtx smCtx) {
String key = releaseLockDO.getLockKey();
Optional<ReentrantLockValue> lock = null;
try {
// 查询锁信息
lock = lockRepositroy.getLock(key, groupIdx);
if (!lock.isPresent()) {
LOGGER.error("release lock {},but key not exist", key);
return true;
}
// 校验锁版本是否正确,锁版本时释放的时候client端传传过来的,属于安全校验
if (lock.get().getLockVersion() != releaseLockDO.getFencingToken()) {
if (smCtx != null) {
smCtx.setLockRet(LockResult.TOKEN_ERROR);
}
return true;
}
// 删除rocksdb中锁信息
lockRepositroy.deleteLock(key, groupIdx);
return true;
} catch (LockException e) {
LOGGER.error("{} release lock key : {} error.", releaseLockDO.getHost(), key);
LOGGER.error(e.getMessage(), e);
if (smCtx != null) {
smCtx.setLockRet(LockResult.EXCEPTION);
}
return false;
}
}
服务端逻辑:
// ReentrantLockService
// 释放锁逻辑
public boolean tryReleaseLock(LockContext lockContext, int groupId) {
ReleaseLockRequest releaseLockRequest = new ReleaseLockRequest();
try {
releaseLockRequest.fromBytes(lockContext.getBuf());
} catch (ProtocolException e) {
LOGGER.error("try release lock error", e);
ackReleaseLock(lockContext.getChannel(), releaseLockRequest, ResponseStatus.ERROR);
return false;
}
String key = releaseLockRequest.getLockKey();
Optional<ReentrantLockValue> lock = null;
long version = 0L;
try {
// 查询锁信息
lock = lockRepository.getLock(key, groupId);
} catch (LockException e) {
LOGGER.error("{} release lock key : {} error.", releaseLockRequest.getHost(), key, e);
ackReleaseLock(lockContext.getChannel(), releaseLockRequest, ResponseStatus.ERROR);
return false;
}
if (!lock.isPresent()) {
LOGGER.debug("ip :{} pid {} threadid {} release lock key {} groupid {}, key not exist return lock deleted ", releaseLockRequest.getHost(), releaseLockRequest.getPid(), releaseLockRequest.getThreadID(), key, groupId);
// 锁不存在回复客户端锁已删除
ackReleaseLock(lockContext.getChannel(), releaseLockRequest, ResponseStatus.LOCK_DELETED);
// 唤醒watch队列中等待的client
trySnatchLock(key, groupId, version, releaseLockRequest.getRegistryKey());
return false;
}
ReentrantLockValue reentrantLockValue = lock.get();
LockOwnerInfo lockOwnerInfo = reentrantLockValue.getLockOwnerInfo();
// 服务端存储的锁版本
version = reentrantLockValue.getLockVersion();
// 校验锁是否过期了
if (!lockOwnerInfo.isExpire()) {
// 锁没有过期
// 校验锁的持有者,和释放锁的client是否时同一个
if (!lockOwnerInfo.equals(releaseLockRequest.getHost(), releaseLockRequest.getThreadID(), releaseLockRequest.getPid())) {
LOGGER.debug("ip :{} pid {} threadid {} release lock key {} groupid {}, key exist and not itself return owner changed", releaseLockRequest.getHost(), releaseLockRequest.getPid(), releaseLockRequest.getThreadID(), key, groupId);
LOGGER.debug("owner is ip :{} pid {} threadid {}", lockOwnerInfo.getIp(), lockOwnerInfo.getPid(), lockOwnerInfo.getThreadId());
ackReleaseLock(lockContext.getChannel(), releaseLockRequest, ResponseStatus.LOCK_CHANGED_OWNER);
return false;
}
// 校验服务端锁版本是否和client端的锁版本一致
if (reentrantLockValue.getLockVersion() != releaseLockRequest.getFencingToken()) {
LOGGER.debug("ip :{} pid {} threadid {} release lock key {} groupid {}, key exist and verison {} {} error", releaseLockRequest.getHost(), releaseLockRequest.getPid(), releaseLockRequest.getThreadID(), key, groupId, releaseLockRequest.getFencingToken(), reentrantLockValue.getLockVersion());
ackReleaseLock(lockContext.getChannel(), releaseLockRequest, ResponseStatus.TOKEN_ERROR);
return false;
}
SMCtx ctx = createCtx();
ReleaseLockDO releaseLockDO = ReleaseLockDO.fromRequest(releaseLockRequest);
byte[] proposeMsg;
try {
proposeMsg = releaseLockDO.toBytes();
} catch (ProtocolException e) {
LOGGER.error("try release lock error", e);
ackReleaseLock(lockContext.getChannel(), releaseLockRequest, ResponseStatus.ERROR);
return false;
}
// 上面校验都通过,发起释放锁propose执行状态机内释放锁逻辑
ProposeResult propose = paxosService.batchPropose(proposeMsg, groupId, ctx, releaseLockRequest.getRegistryKey());
LockSmCtx lockSmCtx = (LockSmCtx) ctx.getpCtx();
if (propose.getResult() == PaxosTryCommitRet.PaxosTryCommitRet_OK.getRet() && lockSmCtx.getLockRet() == LockResult.SUCCESS) {
LOGGER.debug("ip :{} pid {} threadid {} release lock key {} groupid {}, success", releaseLockRequest.getHost(), releaseLockRequest.getPid(), releaseLockRequest.getThreadID(), key, groupId);
// 释放锁成功回复client端
ackReleaseLock(lockContext.getChannel(), releaseLockRequest, ResponseStatus.SUCCESS);
LockClient lockClient = ClientManager.getInstance().getLockOwnerClient(key, new LockOwner(lockOwnerInfo.getIp(), lockOwnerInfo.getThreadId(), lockOwnerInfo.getPid()), groupId);
ClientManager.getInstance().removeLockClient(key, lockClient, groupId);
// 唤醒watch队列中等待的client
trySnatchLock(key, groupId, version, releaseLockRequest.getRegistryKey());
traceWorker.offer(new LockTrace(TimeUtil.getCurrentTimestamp(), ProtocolType.RELEASE_LOCK, IPUtil.getIpStr(lockOwnerInfo.getIp()), lockOwnerInfo.getThreadId(),
lockOwnerInfo.getPid(), releaseLockRequest.getLockKey(), reentrantLockValue.getLockVersion(), releaseLockRequest.getRegistryKey(), -1, LockCodeEnum.Reentrant_Lock));
return true;
}
LOGGER.debug("ip :{} pid {} threadid {} release lock key {} groupid {}, failed", releaseLockRequest.getHost(), releaseLockRequest.getPid(), releaseLockRequest.getThreadID(), key, groupId);
ackReleaseLock(lockContext.getChannel(), releaseLockRequest, ResponseStatus.ERROR);
return false;
} else {
// 锁过期了
LOGGER.debug("ip :{} pid {} threadid {} release lock key {} groupid {}, key delete", releaseLockRequest.getHost(), releaseLockRequest.getPid(), releaseLockRequest.getThreadID(), key, groupId);
// 回复client端锁已经删除
ackReleaseLock(lockContext.getChannel(), releaseLockRequest, ResponseStatus.LOCK_DELETED);
// 通知锁持有者client端锁过期
lockNotify.lockNotifyExpired(key, new LockOwner(lockOwnerInfo.getIp(), lockOwnerInfo.getThreadId(), lockOwnerInfo.getPid()), groupId);
// 删除过期的锁,会执行propose,状态机删除锁逻辑
proposeDeleteKey(releaseLockRequest, groupId, lockOwnerInfo);
// 唤醒watch队列中等待的client
trySnatchLock(key, groupId, version, releaseLockRequest.getRegistryKey());
return false;
}
}
// ReentrantLockService
// 唤醒watch队列中等待的client
private void trySnatchLock(String key, int groupId, long version, String registryKey) {
// 获取watch队列中优先级最高的WatchEvent
WatchEvent watchEvent = watchService.fetchFirstAcquiredWatchEvent(key, groupId);
if (watchEvent != null) {
LOGGER.debug("snatch lock,find first watchevent {}, lockClient {}.", watchEvent.getWatchID(), watchEvent.getLockClient());
AcquireLockDO acquireLockDO = AcquireLockDO.fromWatchEvent(key, watchEvent, version);
try {
SMCtx ctx1 = createCtx();
// 发起propose获取锁,执行状态机获取锁逻辑,详情看获取锁逻辑
ProposeResult proposeRes = paxosService.batchPropose(acquireLockDO.toBytes(), groupId, ctx1, registryKey);
LockSmCtx lockSmCtx = (LockSmCtx) ctx1.getpCtx();
if (proposeRes.getResult() == PaxosTryCommitRet.PaxosTryCommitRet_OK.getRet() && lockSmCtx.getLockRet() == LockResult.SUCCESS) {
LockClient lockClient = watchEvent.getLockClient();
LockOwner lockOwner = new LockOwner(lockClient.getcHost(), lockClient.getcThreadID(), lockClient.getcPid(), lockSmCtx.getFencingToken());
// 获取锁成功,通知client端,client从阻塞状态被唤醒
lockNotify.lockNotifyUpdate(key, lockOwner, groupId);
expireStrategyFactory.addExpireEvent(new LockExpireEvent(lockSmCtx.getExpireTime(), acquireLockDO.getLockKey(),
groupId, lockSmCtx.getFencingToken(), acquireLockDO.getLockType(), acquireLockDO.getOpcode(), acquireLockDO.getHost(), acquireLockDO.getThreadID(), acquireLockDO.getPid()));
// 删除watch队列的WatchEvent
watchService.removeWatchEvent(key, groupId, watchEvent);
traceWorker.offer(new LockTrace(TimeUtil.getCurrentTimestamp(), ProtocolType.WATCH_LOCK, IPUtil.getIpStr(lockClient.getcHost()), lockClient.getcThreadID(),
lockClient.getcPid(), acquireLockDO.getLockKey(), lockSmCtx.getFencingToken(), registryKey, lockSmCtx.getExpireTime(), LockCodeEnum.Reentrant_Lock));
}
} catch (ProtocolException e) {
LOGGER.error("trySnatchLock error.", e);
}
} else {
lockNotify.lockNotifyDelete(key, groupId);
}
}
client端释放逻辑:
// LockService
// 释放锁
public LockResult releaseLock(String lockkey, long lockVersion, boolean forced, long threadID, int lockType, int opcode) {
int timeout = this.wlockClient.getDefaultTimeoutForReq();
// 获取本地锁信息
LockContext lockContext = lockManager.getLocalLockContext(lockkey, threadID, lockType, opcode);
if (lockContext == null) {
// 本地锁不存在,结束
return new LockResult(false, ResponseStatus.LOCK_DELETED);
}
long ownerThreadID = threadID;
if (lockVersion == -1) {
lockVersion = lockContext.getLockVersion();
}
if (ownerThreadID == -1) {
ownerThreadID = lockContext.getLockOption().getThreadID();
}
// 释放本地锁,锁持有次数-1
int releaseRet = this.lockManager.releaseLockLocal(lockkey, ownerThreadID, forced, lockType, opcode);
// releaseRet > 0 为true,锁持有次数还未释放到0
if (releaseRet > LockManager.ReleaseRet.SUCCESS) {
return new LockResult(true, ResponseStatus.SUCCESS);
}
if (releaseRet == LockManager.ReleaseRet.FAIL) {
/*释放锁失败*/
return new LockResult(false, ResponseStatus.ERROR);
}
int groupId = this.wlockClient.getRegistryKey().getGroupId(lockkey);
ReleaseLockRequest releaseLockReq = protocolFactory.createReleaseLockReq(lockkey, groupId, this.registryKey, lockVersion, ownerThreadID, WLockClient.currentPid, lockType, opcode);
try {
// 本地锁持有次数为0,释放服务端锁
SendReqResult sendReqResult = this.serverPoolHandler.syncSendRequest(releaseLockReq, timeout, "releaseLock " + lockkey);
if (sendReqResult != null) {
ReleaseLockResponse resp = new ReleaseLockResponse();
resp.fromBytes(sendReqResult.getData());
if (resp.getStatus() == ResponseStatus.SUCCESS) {
EventCachedHandler.getInstance(wlockClient).unRegisterAcquireEvent(lockkey, ownerThreadID);
return new LockResult(true, ResponseStatus.SUCCESS);
} else if (resp.getStatus() == ResponseStatus.TIMEOUT) {
logger.error(Version.INFO + ", releaseLock status : " + ResponseStatus.toStr(resp.getStatus()) + ", lockkey : " + lockkey + ", server : " + sendReqResult.getServer() + ", timeout : " + timeout);
} else if (resp.getStatus() == ResponseStatus.LOCK_DELETED) {
logger.error(Version.INFO + ", releaseLock status : " + ResponseStatus.toStr(resp.getStatus()) + ", lockkey : " + lockkey + " version: " + lockVersion);
return new LockResult(true, resp.getStatus());
} else {
logger.error(Version.INFO + ", releaseLock status : " + ResponseStatus.toStr(resp.getStatus()) + ", lockkey : " + lockkey + " version: " + lockVersion);
}
return new LockResult(false, resp.getStatus());
}
} catch (Exception e) {
logger.error(Version.INFO + ", releaseLock error.", e);
}
return new LockResult(false, ResponseStatus.TIMEOUT);
}
- 锁过期自动释放 服务端有检测机制检测过期的锁
class ExpireRunnable implements Runnable {
// 获取锁成功后,会加入ExpireEvent到队列中,队列是优先级队列按照过期时间排序
private PriorityBlockingQueue<ExpireEvent> taskQueue;
public ExpireRunnable(PriorityBlockingQueue<ExpireEvent> taskQueue) {
this.taskQueue = taskQueue;
}
@Override
public void run() {
while (!isStop) {
try {
if (isPaused) {
// master节点执行检测操作,其它节点持续sleep
Thread.sleep(SLEEP_INTERVAL);
continue;
}
// 获取最近的过期事件
ExpireEvent expireEvent = taskQueue.peek();
if (null == expireEvent) {
Thread.sleep(SLEEP_INTERVAL);
} else {
long currentTime = TimeUtil.getCurrentTimestamp();
// 检测是否过期了
if (expireEvent.getExpireTimestamp() <= currentTime) {
ExpireEvent hasExpiredEvent = taskQueue.poll(10, TimeUnit.MILLISECONDS);
// 过期事件交给 expireTriggerProcessor 处理,会做删除操作
expireTriggerProcessor.offer(hasExpiredEvent);
} else {
// 计算下还要等多久过期,sleep等待
Thread.sleep(Math.min(expireEvent.getExpireTimestamp() - currentTime, SLEEP_INTERVAL));
}
}
} catch (Exception e) {
logger.info("QueueAllDispatcherThread error", e);
}
}
}
}
- 锁自动续约 客户端实现
// LockManager
// 获取锁后会添加本地锁,根据需要开启自动续约
public boolean updateLockLocal(String lockkey, long lockVersion, InternalLockOption lockOption, boolean needTouch) {
String contextKey = lockkey + lockkey_threadID_seperator + lockOption.getThreadID();
LockContext lockContext = new LockContext(lockkey, contextKey, lockVersion, 1, lockOption);
// 更新过期时长
lockContext.setExpireMills(lockOption.getExpireTime());
lockContextMap(lockOption.getLockType(), lockOption.getLockOpcode()).put(contextKey, lockContext);
updateExpireTime(lockContext, lockOption.getExpireTime());
lockContext.updateRealExpireTime();
if (needTouch) {
startTouchRenew(lockkey, lockContext, lockOption.getLockType(), lockOption.getLockOpcode());
} else if (lockOption.isAutoRenew() || lockOption.getRealExpireTimeStamp() > System.currentTimeMillis()) {
// 设置了自动续约,或者真实的过期时间很长时会开启自动续约
startAutoRenew(lockkey, lockContext, lockOption.getLockType(), lockOption.getLockOpcode());
}
return true;
}
// 加入一个续约任务,定时执行
public void sheduleRenew(RenewLockTask renewTask, int interval) {
scheduledExecutorService.schedule(renewTask, interval, TimeUnit.MILLISECONDS);
}
- 高可用 wlock是paxos集群架构,允许 n - (n/2+1) 个节点宕机,同时不会存在zookeeper leader节点宕机不可用情况,wlock的master是个逻辑master不会强制所有的请求都由master处理,master节点宕机follower节点也可以发起propose处理请求,下文分析client端发现master节点宕机后如何选取其它节点处理请求的
// ServerPool
// 获取处理请求的目标节点
public Server getTargetServer() {
Server targetServer = null;
synchronized (lock) {
// 判断master节点状态是否正常
if (master == null || !master.getState().equals(ServerState.Normal)) {
// master节点状态不正常,选择其它节点
logger.warn(Version.INFO + " master node is null, registry key : " + this.wlockClient.getRegistryKey().getRegistryKey());
if (servList.size() == 0) {
logger.error(Version.INFO + " server list is empty, no avaliable server, registry key : " + this.wlockClient.getRegistryKey().getRegistryKey());
} else {
int index = 0; // master节点位置
for (int i = 0; i < servList.size(); i++) {
Server server = this.servList.get(i);
if (server.getServConfig().isMaster()) {
index = i;
break;
}
}
for (int i = 0, len = servList.size(); i < len; i++) {
Server server = this.servList.get(index);
if (server.getState().equals(ServerState.Normal)) {
// 找到节点状态正常的节点,master节点宕机,正常情况下所有client选择的从节点都是一样的
targetServer = server;
logger.info("server reboot.get new server " + targetServer.toString());
break;
}
// 从master节点位置后面开始选择
index = (++index) % (servList.size());
}
}
} else {
// master节点状态正常,选择master节点
targetServer = master;
}
}
return targetServer;
}
- 安全性 wlock使用paxos一致性协议实现的,节点的锁状态数据是一致的;全局递增的锁版本,可以解决redlock解决不了的数据正确性问题
性能高,功能支持全,安全性高,高可用
与基于redis实现相比,接口响应时间略长,也能达到微妙级
wlock是专门为分布式锁设计的一套实现,具备高性能,高可用,安全性高,功能全等特性,十分的全能;能够很好支持,数据正确性场景;高并发的场景,高可用场景;如果说对接口响应时间特别敏感,安全性要求又没那么高,那建议使用基于redis的实现;其它情况wlock都是一个非常好的选择。