// 软限制就是写文件时规定的租约超时时间,默认是60s privatelong softLimit = HdfsConstants.LEASE_SOFTLIMIT_PERIOD; // 硬限制则是考虑到文件close时未来得及释放lease的情况强制回收租约,默认是1h privatelong hardLimit = HdfsConstants.LEASE_HARDLIMIT_PERIOD; // // Used for handling lock-leases // Mapping: leaseHolder -> Lease // 租约持有者和租约的映射 privatefinal SortedMap<String, Lease> leases = new TreeMap<String, Lease>(); // Set of: Lease // 存储nn所发放的所有租约 privatefinal SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
// // Map path names to leases. It is protected by the sortedLeases lock. // The map stores pathnames in lexicographical order. // 路径和租约的映射 privatefinal SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>();
/** Get a {@link LeaseRenewer} instance */ // 通过静态方法调用工厂类得到一个user对应的LeaseRenewer实例 static LeaseRenewer getInstance(final String authority, final UserGroupInformation ugi, final DFSClient dfsc)throws IOException { final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi); r.addClient(dfsc); return r; }
/** * A factory for sharing {@link LeaseRenewer} objects * among {@link DFSClient} instances * so that there is only one renewer per authority per user. */ // 工厂类,实例化LeaseRenewer privatestaticclassFactory{ privatestaticfinal Factory INSTANCE = new Factory(); // 由存放namenode信息的authority和user信息的ugi唯一标识LeaseRenewer实例 // 即一个user一个LeaseRenewer privatestaticclassKey{ /** Namenode info */ final String authority; /** User info */ final UserGroupInformation ugi;
/** A map for per user per namenode renewers. */ privatefinal Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>();
/** Get a renewer. */ privatesynchronized LeaseRenewer get(final String authority, final UserGroupInformation ugi){ final Key k = new Key(authority, ugi); LeaseRenewer r = renewers.get(k); if (r == null) { r = new LeaseRenewer(k); renewers.put(k, r); } return r; }
/** Remove the given renewer. */ privatesynchronizedvoidremove(final LeaseRenewer r){ final LeaseRenewer stored = renewers.get(r.factorykey); //Since a renewer may expire, the stored renewer can be different. if (r == stored) { if (!r.clientsRunning()) { renewers.remove(r.factorykey); } } } } // 单例模式中,构造方法私有 privateLeaseRenewer(Factory.Key factorykey){ this.factorykey = factorykey; unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT); ... } }
privatevoidrenew()throws IOException { final List<DFSClient> copies; synchronized(this) { copies = new ArrayList<DFSClient>(dfsclients); } //sort the client names for finding out repeated names. Collections.sort(copies, new Comparator<DFSClient>() { @Override publicintcompare(final DFSClient left, final DFSClient right){ return left.getClientName().compareTo(right.getClientName()); } }); String previousName = ""; for(int i = 0; i < copies.size(); i++) { final DFSClient c = copies.get(i); //skip if current client name is the same as the previous name. if (!c.getClientName().equals(previousName)) { // 续约 if (!c.renewLease()) { if (LOG.isDebugEnabled()) { LOG.debug("Did not renew lease for client " + c); } continue; } previousName = c.getClientName(); ... } } }
final List<String> removing = new ArrayList<String>(); // need to create a copy of the oldest lease paths, becuase // internalReleaseLease() removes paths corresponding to empty files, // i.e. it needs to modify the collection being iterated over // causing ConcurrentModificationException String[] leasePaths = new String[oldest.getPaths().size()]; oldest.getPaths().toArray(leasePaths); // 对超过hardLimit的租约中的paths进行处理 for(String p : leasePaths) { try { // 对超过hardLimit的租约中的文件进行释放 boolean completed = fsnamesystem.internalReleaseLease(oldest, p, HdfsServerConstants.NAMENODE_LEASE_HOLDER); ... // If a lease recovery happened, we need to sync later. if (!needSync && !completed) { needSync = true; } } catch (IOException e) { LOG.error("Cannot release the path " + p + " in the lease " + oldest, e); removing.add(p); } }
int nrCompleteBlocks; BlockInfo curBlock = null; // 找到此file中未完成的block for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) { curBlock = blocks[nrCompleteBlocks]; if(!curBlock.isComplete()) break; assert blockManager.checkMinReplication(curBlock) : "A COMPLETE block is not minimally replicated in " + src; }
// If there are no incomplete blocks associated with this file, // then reap lease immediately and close the file. // 所以的block都完成,则直接关闭文件释放租约 if(nrCompleteBlocks == nrBlocks) { finalizeINodeFileUnderConstruction(src, pendingFile, iip.getLatestSnapshotId()); NameNode.stateChangeLog.warn("BLOCK*" + " internalReleaseLease: All existing blocks are COMPLETE," + " lease removed, file closed."); returntrue; // closed! } // Only the last and the penultimate blocks may be in non COMPLETE state. // If the penultimate block is not COMPLETE, then it must be COMMITTED. // 假如存在未完成的block,则此block只能是最后一个block或者倒数第二个block // 当未完成的block是倒数第二个block时,倒数第二个block的状态必须是COMMITTED // 如果不是这两种情况,即存在别的block未完成,则抛出异常,在checkLeases中捕获 if(nrCompleteBlocks < nrBlocks - 2 || nrCompleteBlocks == nrBlocks - 2 && curBlock != null && curBlock.getBlockUCState() != BlockUCState.COMMITTED) { final String message = "DIR* NameSystem.internalReleaseLease: " + "attempt to release a create lock on " + src + " but file is already closed."; NameNode.stateChangeLog.warn(message); thrownew IOException(message); }
// The last block is not COMPLETE, and // that the penultimate block if exists is either COMPLETE or COMMITTED final BlockInfo lastBlock = pendingFile.getLastBlock(); BlockUCState lastBlockState = lastBlock.getBlockUCState(); BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
// If penultimate block doesn't exist then its minReplication is met boolean penultimateBlockMinReplication = penultimateBlock == null ? true : blockManager.checkMinReplication(penultimateBlock);
switch(lastBlockState) { case COMPLETE: assertfalse : "Already checked that the last block is incomplete"; break; case COMMITTED: // Close file if committed blocks are minimally replicated if(penultimateBlockMinReplication && blockManager.checkMinReplication(lastBlock)) { finalizeINodeFileUnderConstruction(src, pendingFile, iip.getLatestSnapshotId()); NameNode.stateChangeLog.warn("BLOCK*" + " internalReleaseLease: Committed blocks are minimally replicated," + " lease removed, file closed."); returntrue; // closed! } // Cannot close file right now, since some blocks // are not yet minimally replicated. // This may potentially cause infinite loop in lease recovery // if there are no valid replicas on data-nodes. String message = "DIR* NameSystem.internalReleaseLease: " + "Failed to release lease for file " + src + ". Committed blocks are waiting to be minimally replicated." + " Try again later."; NameNode.stateChangeLog.warn(message); thrownew AlreadyBeingCreatedException(message); case UNDER_CONSTRUCTION: case UNDER_RECOVERY: final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)lastBlock; // setup the last block locations from the blockManager if not known if (uc.getNumExpectedLocations() == 0) { uc.setExpectedLocations(blockManager.getStorages(lastBlock)); }
if (uc.getNumExpectedLocations() == 0 && uc.getNumBytes() == 0) { // There is no datanode reported to this block. // may be client have crashed before writing data to pipeline. // This blocks doesn't need any recovery. // We can remove this block and close the file. pendingFile.removeLastBlock(lastBlock); finalizeINodeFileUnderConstruction(src, pendingFile, iip.getLatestSnapshotId()); NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: " + "Removed empty last block and closed file."); returntrue; } // start recovery of the last block for this file long blockRecoveryId = nextGenerationStamp(isLegacyBlock(uc)); lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); uc.initializeBlockRecovery(blockRecoveryId); leaseManager.renewLease(lease); // Cannot close file right now, since the last block requires recovery. // This may potentially cause infinite loop in lease recovery // if there are no valid replicas on data-nodes. NameNode.stateChangeLog.warn( "DIR* NameSystem.internalReleaseLease: " + "File " + src + " has not been closed." + " Lease recovery is in progress. " + "RecoveryId = " + blockRecoveryId + " for block " + lastBlock); break; } returnfalse; }