租约(Lease)是一种广泛应用与分布式系统领域的协议,主要用来维护分布式系统的一致性。

租约是在解决缓存一致性时被提出的。所谓租约,其实就是一个_合同_,即服务器给予_客户端_在_一定期限_内可以_控制修改操作_的权力。_如果服务器要修改数据,首先要征求拥有这块数据的租约的客户端的同意,之后才可以修改。_客户端从服务器读取数据时往往就同时获取租约,在租约期限内,如果没有收到服务器的修改请求,就可以保证当前缓存中的内容就是最新的。如果在租约期限内收到了修改数据的请求并且同意了,就需要清空缓存。在租约过 期以后,客户端如果还要从缓存读取数据,就必须重新获取租约,我们称这个操作为续约。

租约特性

租约的一个重要的属性就是期限,一般情况下,应当选择较短的租约期限。与长租约相比,短租约有三个优点。首先,在失效情况下修改操作往往需要等待租约过期,因此短租约就意味着更短的失效延迟 。其次,就算一个客户端已经不再需要读取数据,但在其租约过期前,任何的修改操作仍然需要征求它的同意,这种情况叫做“假共享”,显然租约期限越长,这个问题就越严重。最后,短租约也使得服务器要维护的客户端信息更少。然而短租约也意味着更大的续约开销,因此对于要反复读取却很少修改的数据,长租约会更有效。因此,对租约期的选择要权衡失效延迟、假共享开销和续约开销等多个因素,服务器可以根据数据访问特性和客户端的性质灵活设置期限。事实上,如果我们把租约期限设为零,就相当于轮询,此时修改操作随时可以进行,而读取数据总是要联系服务器。如果把租约期 限设为无限长,就相当于回调

除了期限的选择,还有很多管理选项。对客户端来说,可以选择是否续约、何时续约以及是否同意修改等。比如为了减少读取延迟,客户端可以在租约过期前就续约,不过这样可能加重服务器的负担。对服务器来说,可以选择是否发放租约、租约覆盖粒度以及对如何进行修改操作。比如在收到修改请求后,服务器可以不征求客户端同意,而是简单的等待所有租约过期(等待时不再发放新租约以避免无限期的延迟)。对于“安装文件”,也就是修改极少的文件(比如头文件、库文件),服务器可以用一个租约来覆盖一批文件,同时定期广播续约通知来节省开销,如果需要修改数据,就停止广播并等待租约过期即可。

在很多时候,租约的定义似乎很模糊,有的时候租约类似心跳,有的时候又类似于锁(读锁和写锁)。到底租约的本质是什么呢?
回到租约最原始的定义:租约就是在一定期限内给予持有者特定权力的协议。我觉得这里的期限就是租约的根本特性,正是这一特性使得租约可以容忍机器失效和网络分割。在期限之内,租约其实就是服务器和客户端之间的协议,而这个协议的内容可以五花八门。
如果协议内容是服务器确认客户端还存活,那么这个租约的功能就相当于心跳
如果协议内容是服务器保证内容不会被修改,那么这个租约就相当于读锁如果协议内容是服务器保证内容只能被这个客户端修改,那么这个租约就相当于写锁。租约这种灵活性和容错性,使其成为了维护分布式系统一致性的有效工具。

租约在HDFS中的应用–写锁

hdfs支持write-once-read-many,也就是说不支持并行写,那么对读写的互斥同步就是靠Lease实现的。Lease说白了就是一个有时间约束的锁。客户端写文件时需要先申请一个Lease,持有该租约的客户端才可以对相应的文件进行块的添加。

与租约相关的类有:

Server端:
LeaseManager – 管理写文件相关的租约
LeaseManager.Monitor – 监控租约是否过期(主要检查hardLimit)
LeaseManager.Lease – 租约实体类,管理某个客户端持有的所以写锁

Client端:
LeaseRenewer – 客户端续约更新类
下面先简单介绍下各类的内部结构

Lease

Lease是LeaseManager的内部类,其实例对应一个租约,租约中包含持有者信息、租约期限和该租约对应的文件。

1
2
3
4
5
6
7
8
class Lease implements Comparable<Lease> {
// 租约持有者(持有租约的客户端名字)
private final String holder;
// 租约更新的时间
private long lastUpdate;
// 该租约中包含的文件(包含持有该租约的客户端所打开的所有文件)
private final Collection<String> paths = new TreeSet<String>();
}

一个客户端对应一个租约,一个客户端可以同时写很多个文件,这些文件放在paths中,租约维护着这些文件的写权限,并对这些文件统一续约,并不是对某个文件单独续约,不需要对某个文件进行操作之后直接从paths中移除,如果paths为null,则回收此租约。

LeaseManager

LeaseManager是租约管理类,其内部主要维护了3个集合列表(leases、sortedLeases和sortedLeasesByPath)和两个变量(softLimit和hardLimit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 软限制就是写文件时规定的租约超时时间,默认是60s
private long softLimit = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
// 硬限制则是考虑到文件close时未来得及释放lease的情况强制回收租约,默认是1h
private long hardLimit = HdfsConstants.LEASE_HARDLIMIT_PERIOD;
//
// Used for handling lock-leases
// Mapping: leaseHolder -> Lease
// 租约持有者和租约的映射
private final SortedMap<String, Lease> leases = new TreeMap<String, Lease>();
// Set of: Lease
// 存储nn所发放的所有租约
private final SortedSet<Lease> sortedLeases = new TreeSet<Lease>();

//
// Map path names to leases. It is protected by the sortedLeases lock.
// The map stores pathnames in lexicographical order.
// 路径和租约的映射
private final SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>();

在softLimit期限内,该客户端拥有对这个文件的独立访问权,其他客户端不能剥夺该客户端独占写这个文件的权利。
softLimit过期后,任何一个客户端都可以回收lease,继而得到这个文件的lease,获得对这个文件的独占访问权。
hardLimit过期后,namenode强制关闭文件,撤销lease。

sortedLeases中存放这从nn发出的所有租约,其中Lease按照时间顺序排序,Monitor检查hardLimit时,从sortedLeases中按照顺序拿出Lease检查就可以了。

Monitor

Monitor是一个Runnable类,主要用来检测Lease是否超过了hardLimit期限。在run中调用LeaseManager.checkLeases方法进行检测。其周期性是(2s)

LeaseRenewer

LeaseRenewer是client端更新自己租约。其中有个线程检测租约的softLimit期限,其周期性(1s)的调用LeaseRenewer.run()方法对租约过半的lease进行续约。

LeaseRenewer是一个单例,并通过工厂来实例化。这里的单例是一个user一个LeaseRenewer,但是服务器端是一个DFSClient对应一个lease,一个user可能会实例化多个DFSClient,则LeaseRenewer会有个list属性来存储多个DFSClient,这个list就是dfsclients

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class LeaseRenewer {
static final Log LOG = LogFactory.getLog(LeaseRenewer.class);

static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;

/** Get a {@link LeaseRenewer} instance */
// 通过静态方法调用工厂类得到一个user对应的LeaseRenewer实例
static LeaseRenewer getInstance(final String authority,
final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
r.addClient(dfsc);
return r;
}

/**
* A factory for sharing {@link LeaseRenewer} objects
* among {@link DFSClient} instances
* so that there is only one renewer per authority per user.
*/
// 工厂类,实例化LeaseRenewer
private static class Factory {
private static final Factory INSTANCE = new Factory();
// 由存放namenode信息的authority和user信息的ugi唯一标识LeaseRenewer实例
// 即一个user一个LeaseRenewer
private static class Key {
/** Namenode info */
final String authority;
/** User info */
final UserGroupInformation ugi;

private Key(final String authority, final UserGroupInformation ugi) {
if (authority == null) {
throw new HadoopIllegalArgumentException("authority == null");
} else if (ugi == null) {
throw new HadoopIllegalArgumentException("ugi == null");
}
this.authority = authority;
this.ugi = ugi;
}
...
}

/** A map for per user per namenode renewers. */
private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>();

/** Get a renewer. */
private synchronized LeaseRenewer get(final String authority,
final UserGroupInformation ugi) {
final Key k = new Key(authority, ugi);
LeaseRenewer r = renewers.get(k);
if (r == null) {
r = new LeaseRenewer(k);
renewers.put(k, r);
}
return r;
}

/** Remove the given renewer. */
private synchronized void remove(final LeaseRenewer r) {
final LeaseRenewer stored = renewers.get(r.factorykey);
//Since a renewer may expire, the stored renewer can be different.
if (r == stored) {
if (!r.clientsRunning()) {
renewers.remove(r.factorykey);
}
}
}
}
// 单例模式中,构造方法私有
private LeaseRenewer(Factory.Key factorykey) {
this.factorykey = factorykey;
unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
...
}
}

租约 – 写锁流程

新增写租约

充当写锁的租约是client发起写请求时,一起跟nn申请的(其具体的写操作流程请看之前的文章[HDFS write解析](http://bigdatadecode.top/HDFS write解析.html)),client向nn申请写操作的流程为:
FileSystem.create() --> DistributedFileSystem.create() --> FileSystemLinkResolver.resolve() --> doCall() --> dfs.create() --> DFSOutputStream.newStreamForCreate() --> dfsClient.namenode.create() --> namesystem.startFile() -> startFileInt() -> startFileInternal()。在startFileInternal中通过newNode = dir.addFile(src, permissions, replication, blockSize, holder, clientMachine);向namenode中添加一个文件,并将clientname对src的租约进行存储leaseManager.addLease(newNode.getFileUnderConstructionFeature().getClientName(), src);,这里addLease就是client申请租约,看下代码逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// LeaseManager.class
synchronized Lease addLease(String holder, String src) {
Lease lease = getLease(holder);
if (lease == null) {
lease = new Lease(holder);
leases.put(holder, lease);
sortedLeases.add(lease);
} else {
renewLease(lease);
}
sortedLeasesByPath.put(src, lease);
lease.paths.add(src);
return lease;
}

在nn端一个Lease对应一个DFSClient(DFSClient是由ugi构造的,不是指hadoop集群的client那台机器),Lease是由holder标识的,holder的值就是DFSClient.clientName,clientName在DFSClient的构造函数中初始化,代码如下:

1
2
3
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();

clientName是由taskId、随机数和currentThread.Id拼起来的,所以每次写请求的clientName是不一样的,则Lease也是不一样的

addLease的逻辑是先从LeaseManager.leases(holder和lease映射)中查找是否存在holder对应的lease,不存在则由LeaseManager创建一个lease,存在则更新lease。LeaseManager通过实例化Lease类来创建租约,Lease的构造方法如下:

1
2
3
4
5
6
7
8
private Lease(String holder) {
this.holder = holder;
renew();
}

private void renew() {
this.lastUpdate = now();
}

new出lease后,将其放入LeaseManager中的三个集合中,并把此租约对应的path放入lease的paths中。

租约添加完成。

客户端续约

客户端在dfs.create()中调用beginFileLease()对租约进行续约。

1
2
3
4
private void beginFileLease(final long inodeId, final DFSOutputStream out)
throws IOException {
getLeaseRenewer().put(inodeId, out, this);
}

客户端续约是通过LeaseRenewer来实现的,LeaseRenewer是由存放namenode信息的authority和user信息的ugi来实例化的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// DFSClient.class
public LeaseRenewer getLeaseRenewer() throws IOException {
return LeaseRenewer.getInstance(authority, ugi, this);
}
// LeaseRenewer.class
static LeaseRenewer getInstance(final String authority,
final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
r.addClient(dfsc);
return r;
}
// LeassRenewer.Factory.class
private synchronized LeaseRenewer get(final String authority,
final UserGroupInformation ugi) {
final Key k = new Key(authority, ugi);
LeaseRenewer r = renewers.get(k);
if (r == null) {
r = new LeaseRenewer(k);
renewers.put(k, r);
}
return r;
}

LeaseRenewer的实例化是通过Factory实例化的,Factory先去renewers中查找是否有当前user的LeaseRenewer,没有则new一个,有则直接返回已有的LeaseRenewer,然后在getInstance中,将DFSClient的实例dfsc放入LeaseRenewer的dfsclients的list中。user对应的LeaseRenewer对象初始化完毕。

然后调用put方法将文件标识Id、对应的文件流和DFSClient实例传入LeaseRenewer中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
synchronized void put(final long inodeId, final DFSOutputStream out,
final DFSClient dfsc) {
if (dfsc.isClientRunning()) {
// 判断daemon是否在运行,
// 或者检查dfsclients为空之后的时间是否超过了gracePeriod
// 如果daemon没有运行或者为空的时间超过了gracePeriod则新new一个守护线程
if (!isRunning() || isRenewerExpired()) {
//start a new deamon with a new id.
final int id = ++currentId;
daemon = new Daemon(new Runnable() {
@Override
public void run() {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " started");
}
LeaseRenewer.this.run(id);
} catch(InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
+ " is interrupted.", e);
}
} finally {
synchronized(LeaseRenewer.this) {
Factory.INSTANCE.remove(LeaseRenewer.this);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " exited");
}
}
}

@Override
public String toString() {
return String.valueOf(LeaseRenewer.this);
}
});
daemon.start();
}
dfsc.putFileBeingWritten(inodeId, out);
emptyTime = Long.MAX_VALUE;
}
}

在put中有个守护线程,在守护线程中调用LeaseRenewer.run方法对租约进行check然后renew,这里check的是softLimit守护线程只有在daemon为null或者dfsclients为空的时间超过了gracePeriod时才需要重新new一个daemon线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void run(final int id) throws InterruptedException {
for(long lastRenewed = Time.now(); !Thread.interrupted();
Thread.sleep(getSleepPeriod())) {
final long elapsed = Time.now() - lastRenewed;
// 判断是否超过了softLimit的一半
if (elapsed >= getRenewalTime()) {
try {
// 续约
renew();
...
// 更新续约时间
lastRenewed = Time.now();
} catch (SocketTimeoutException ie) {
...
break;
} catch (IOException ie) {
...
}
}
...
}
}

run中调用renew()进行续约,这里续约是对当前user的所有DFSClient(也就是当前user的所有Lease)进行续约。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void renew() throws IOException {
final List<DFSClient> copies;
synchronized(this) {
copies = new ArrayList<DFSClient>(dfsclients);
}
//sort the client names for finding out repeated names.
Collections.sort(copies, new Comparator<DFSClient>() {
@Override
public int compare(final DFSClient left, final DFSClient right) {
return left.getClientName().compareTo(right.getClientName());
}
});
String previousName = "";
for(int i = 0; i < copies.size(); i++) {
final DFSClient c = copies.get(i);
//skip if current client name is the same as the previous name.
if (!c.getClientName().equals(previousName)) {
// 续约
if (!c.renewLease()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Did not renew lease for client " +
c);
}
continue;
}
previousName = c.getClientName();
...
}
}
}

在renew中,先对dfsclients中的DFSClient进行排序,主要是为了将重复发clientName放在一起,renew时只对其中一个clientName进行更新,调用c.renewLease进行续约

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
boolean renewLease() throws IOException {
if (clientRunning && !isFilesBeingWrittenEmpty()) {
try {
// rpc调用LeaseManager.renewLease
namenode.renewLease(clientName);
updateLastLeaseRenewal();
return true;
} catch (IOException e) {
// Abort if the lease has already expired.
final long elapsed = Time.now() - getLastLeaseRenewal();
if (elapsed > HdfsConstants.LEASE_HARDLIMIT_PERIOD) {
LOG.warn("Failed to renew lease for " + clientName + " for "
+ (elapsed/1000) + " seconds (>= hard-limit ="
+ (HdfsConstants.LEASE_HARDLIMIT_PERIOD/1000) + " seconds.) "
+ "Closing all files being written ...", e);
closeAllFilesBeingWritten(true);
} else {
// Let the lease renewer handle it and retry.
throw e;
}
}
}
return false;
}

在renewLease中远程调用LeaseManager.renewLease,其调用流程为NameNodeRpcServer.renewLease --> FSNamesystem.renewLease --> LeaseManager.renewLease(holder)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// LeaseManager.class
synchronized void renewLease(String holder) {
renewLease(getLease(holder));
}
synchronized void renewLease(Lease lease) {
if (lease != null) {
sortedLeases.remove(lease);
lease.renew();
sortedLeases.add(lease);
}
}
// LeaseManager.Lease.class
private void renew() {
this.lastUpdate = now();
}

客户端通过LeaseRenewer调用LeaseManager.renewLease进行续约,续约逻辑是先从leases中get到clientName对应的lease,然后从sortedLeases中移除该lease,调用lease.renew对lease的lastUpdate进行更新,最后将lease再放入sortedLeases中。sortedLeases中的lease是按照lease的lastUpdate进行排序的,到此客户端续约的流程结束。

nn端周期性check lease

nn端由LeaseManager.Monitor周期性check lease是否超期,这里check hardLimit。Monitor的逻辑比较简单,主要逻辑是在其run方法中。Monitor是一个Runnable类,在active nn启动的时候调用leaseManager.startMonitor()启动一个Monitor的守护线程。其run方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void run() {
for(; shouldRunMonitor && fsnamesystem.isRunning(); ) {
boolean needSync = false;
try {
fsnamesystem.writeLockInterruptibly();
try {
if (!fsnamesystem.isInSafeMode()) {
// 检查是否超期
needSync = checkLeases();
}
} finally {
fsnamesystem.writeUnlock();
// lease reassignments should to be sync'ed.
if (needSync) {
fsnamesystem.getEditLog().logSync();
}
}
// sleep NAMENODE_LEASE_RECHECK_INTERVAL后再次check,默认2000ms
Thread.sleep(HdfsServerConstants.NAMENODE_LEASE_RECHECK_INTERVAL);
} catch(InterruptedException ie) {
...
}
}
}

run中调用checkLeases进行hardLimit检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// LeaseManager.class
private synchronized boolean checkLeases() {
boolean needSync = false;
assert fsnamesystem.hasWriteLock();
for(; sortedLeases.size() > 0; ) {
// 从sortedLeases中取出第一个lease,
// 也就是时间最久的lease
final Lease oldest = sortedLeases.first();
// 检查lease是否超过hardLimit
// 没有超过则return,超过不进if
// now() - lastUpdate > hardLimit
if (!oldest.expiredHardLimit()) {
return needSync;
}

LOG.info(oldest + " has expired hard limit");

final List<String> removing = new ArrayList<String>();
// need to create a copy of the oldest lease paths, becuase
// internalReleaseLease() removes paths corresponding to empty files,
// i.e. it needs to modify the collection being iterated over
// causing ConcurrentModificationException
String[] leasePaths = new String[oldest.getPaths().size()];
oldest.getPaths().toArray(leasePaths);
// 对超过hardLimit的租约中的paths进行处理
for(String p : leasePaths) {
try {
// 对超过hardLimit的租约中的文件进行释放
boolean completed = fsnamesystem.internalReleaseLease(oldest, p,
HdfsServerConstants.NAMENODE_LEASE_HOLDER);
...
// If a lease recovery happened, we need to sync later.
if (!needSync && !completed) {
needSync = true;
}
} catch (IOException e) {
LOG.error("Cannot release the path " + p + " in the lease "
+ oldest, e);
removing.add(p);
}
}

for(String p : removing) {
removeLease(oldest, p);
}
}
return needSync;
}

释放租约or租约回收or租约恢复

对于超过hardLimit的租约进行释放,对于租约的释放不能简单的remove掉,逻辑比较复杂,有的需要block恢复,其具体实现方法是internalReleaseLease

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
boolean internalReleaseLease(Lease lease, String src, 
String recoveryLeaseHolder) throws AlreadyBeingCreatedException,
IOException, UnresolvedLinkException {
LOG.info("Recovering " + lease + ", src=" + src);
...
// 得到src对应的INodeFile信息
final INodesInPath iip = dir.getLastINodeInPath(src);
final INodeFile pendingFile = iip.getINode(0).asFile();
int nrBlocks = pendingFile.numBlocks();
BlockInfo[] blocks = pendingFile.getBlocks();

int nrCompleteBlocks;
BlockInfo curBlock = null;
// 找到此file中未完成的block
for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) {
curBlock = blocks[nrCompleteBlocks];
if(!curBlock.isComplete())
break;
assert blockManager.checkMinReplication(curBlock) :
"A COMPLETE block is not minimally replicated in " + src;
}

// If there are no incomplete blocks associated with this file,
// then reap lease immediately and close the file.
// 所以的block都完成,则直接关闭文件释放租约
if(nrCompleteBlocks == nrBlocks) {
finalizeINodeFileUnderConstruction(src, pendingFile,
iip.getLatestSnapshotId());
NameNode.stateChangeLog.warn("BLOCK*"
+ " internalReleaseLease: All existing blocks are COMPLETE,"
+ " lease removed, file closed.");
return true; // closed!
}
// Only the last and the penultimate blocks may be in non COMPLETE state.
// If the penultimate block is not COMPLETE, then it must be COMMITTED.
// 假如存在未完成的block,则此block只能是最后一个block或者倒数第二个block
// 当未完成的block是倒数第二个block时,倒数第二个block的状态必须是COMMITTED
// 如果不是这两种情况,即存在别的block未完成,则抛出异常,在checkLeases中捕获
if(nrCompleteBlocks < nrBlocks - 2 ||
nrCompleteBlocks == nrBlocks - 2 &&
curBlock != null &&
curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
final String message = "DIR* NameSystem.internalReleaseLease: "
+ "attempt to release a create lock on "
+ src + " but file is already closed.";
NameNode.stateChangeLog.warn(message);
throw new IOException(message);
}

// The last block is not COMPLETE, and
// that the penultimate block if exists is either COMPLETE or COMMITTED
final BlockInfo lastBlock = pendingFile.getLastBlock();
BlockUCState lastBlockState = lastBlock.getBlockUCState();
BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();

// If penultimate block doesn't exist then its minReplication is met
boolean penultimateBlockMinReplication = penultimateBlock == null ? true :
blockManager.checkMinReplication(penultimateBlock);

switch(lastBlockState) {
case COMPLETE:
assert false : "Already checked that the last block is incomplete";
break;
case COMMITTED:
// Close file if committed blocks are minimally replicated
if(penultimateBlockMinReplication &&
blockManager.checkMinReplication(lastBlock)) {
finalizeINodeFileUnderConstruction(src, pendingFile,
iip.getLatestSnapshotId());
NameNode.stateChangeLog.warn("BLOCK*"
+ " internalReleaseLease: Committed blocks are minimally replicated,"
+ " lease removed, file closed.");
return true; // closed!
}
// Cannot close file right now, since some blocks
// are not yet minimally replicated.
// This may potentially cause infinite loop in lease recovery
// if there are no valid replicas on data-nodes.
String message = "DIR* NameSystem.internalReleaseLease: " +
"Failed to release lease for file " + src +
". Committed blocks are waiting to be minimally replicated." +
" Try again later.";
NameNode.stateChangeLog.warn(message);
throw new AlreadyBeingCreatedException(message);
case UNDER_CONSTRUCTION:
case UNDER_RECOVERY:
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)lastBlock;
// setup the last block locations from the blockManager if not known
if (uc.getNumExpectedLocations() == 0) {
uc.setExpectedLocations(blockManager.getStorages(lastBlock));
}

if (uc.getNumExpectedLocations() == 0 && uc.getNumBytes() == 0) {
// There is no datanode reported to this block.
// may be client have crashed before writing data to pipeline.
// This blocks doesn't need any recovery.
// We can remove this block and close the file.
pendingFile.removeLastBlock(lastBlock);
finalizeINodeFileUnderConstruction(src, pendingFile,
iip.getLatestSnapshotId());
NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: "
+ "Removed empty last block and closed file.");
return true;
}
// start recovery of the last block for this file
long blockRecoveryId = nextGenerationStamp(isLegacyBlock(uc));
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
uc.initializeBlockRecovery(blockRecoveryId);
leaseManager.renewLease(lease);
// Cannot close file right now, since the last block requires recovery.
// This may potentially cause infinite loop in lease recovery
// if there are no valid replicas on data-nodes.
NameNode.stateChangeLog.warn(
"DIR* NameSystem.internalReleaseLease: " +
"File " + src + " has not been closed." +
" Lease recovery is in progress. " +
"RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
break;
}
return false;
}

此方法主要是检查是否需要真正的进入block recovery阶段,这个阶段需要datanode的参与。下面函数的主要逻辑

  1. 先检查此file的block是否都是completed状态,如果都是completed,则直接调用finalizeINodeFileUnderConstruction(file正常关闭的逻辑,在下一节的租约关闭中会介绍)关闭file,return true。如果有未completed的,则执行第二步
  2. 判断未completed的block的索引。如果存在未完成的block,则此block只能是最后一个block或者倒数第二个block,当未完成的block是倒数第二个block时,倒数第二个block的状态必须是COMMITTED,符合此条件执行第三步。如果不是这两种情况,即存在别的block未完成,则抛出异常,在checkLeases中捕获。
  3. 在switch中判断最后一个block的状态,如果是COMMITTED,并且该文件的最后两个block都满足最小副本数要求,则调用finalizeINodeFileUnderConstruction关闭文件,return true。否则抛出异常。如果是UNDER_CONSTRUCTION或者UNDER_RECOVERY,并且最后一个block没有任何datanode汇报上来,很有可能是pipeline还没建立起来,客户端就宕机了,这种情况下,只需要把最后一个block从INode中移出,并且关闭文件。否则的话进入block recovery阶段(这一阶段再次处不展开,以后再分析)。

关闭租约

文件写完之后调用FSDataOutputStream.close()关闭写入流,FSDataOutputStream.close()的具体实现是由其几个子类实现的,这里看下DFSOutputStream.close(),close中调用completeFile --> NameNodeRpcServer.complete --> FSNamesystem.completeFile --> completeFileInternal --> finalizeINodeFileUnderConstruction --> leaseManager.removeLease,在finalizeINodeFileUnderConstruction中调用leaseManager.removeLease关闭租约,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
synchronized void removeLease(String holder, String src) {
// 得到该holder的lease
Lease lease = getLease(holder);
if (lease != null) {
// 从集合中将该src的锁remove掉
removeLease(lease, src);
} else {
LOG.warn("Removing non-existent lease! holder=" + holder +
" src=" + src);
}
}
synchronized void removeLease(Lease lease, String src) {
// 将该src从sortedLeasesByPath中移除
sortedLeasesByPath.remove(src);
// 将src从lease的paths中移除
if (!lease.removePath(src)) {
if (LOG.isDebugEnabled()) {
LOG.debug(src + " not found in lease.paths (=" + lease.paths + ")");
}
}
// 如果lease的paths中移除该src后,paths为null
// 则说明该租约中没有文件被打开,将该租约从leases中移除,
// 也就是关闭该租约,关闭之后从sortedLeases中移除
if (!lease.hasPath()) {
leases.remove(lease.holder);
if (!sortedLeases.remove(lease)) {
LOG.error(lease + " not found in sortedLeases");
}
}
}

关闭租约的逻辑比较简单,只是关闭租约时并不是简单的把该租约从各个集合中移除,而是只是将关闭src的记录从各个集合中移除,如果租约lease的paths中的src记录都被移除掉,则该租约就可以关闭

总结

正常情况下,客户端向集群写文件前需要向NameNode的LeaseManager申请Lease;写文件过程中定期更新Lease时间,以防Lease过期,周期与softLimit相关;写完数据后申请释放Lease。

整个过程可能发生两类问题:(1)写文件过程中客户端没有及时更新Lease时间;(2)写完文件后没有成功释放Lease。两个问题分别对应为softLimit和hardLimit。两种场景都会触发LeaseManager对Lease超时强制回收。如果客户端写文件过程中没有及时更新Lease超过softLimit时间后,另一客户端尝试对同一文件进行写操作时触发Lease软超时强制回收;如果客户端写文件完成但是没有成功释放Lease,则会由LeaseManager的后台线程LeaseManager.Monitor检查是否硬超时后统一触发超时回收。不管是softLimit还是hardLimit超时触发的强制Lease回收,处理逻辑都一样:FSNamesystem.internalReleaseLease,逻辑本身比较复杂,已在上面详细介绍。简单的说先对Lease过期前最后一次写入的Block进行检查和修复,之后释放超时持有的Lease,保证后面其他客户端的写入能够正常申请到该文件的Lease。