publicvoidrun(){ while (namesystem.isRunning()) { try { // Process replication work only when active NN is out of safe mode. if (namesystem.isPopulatingReplQueues()) { computeDatanodeWork(); processPendingReplications(); } Thread.sleep(replicationRecheckInterval); } catch (Throwable t) { ... } } }
/** * Compute block replication and block invalidation work that can be scheduled * on data-nodes. The datanode will be informed of this work at the next * heartbeat. * * @return number of blocks scheduled for replication or removal. */ intcomputeDatanodeWork(){ // Blocks should not be replicated or removed if in safe mode. // It's OK to check safe mode here w/o holding lock, in the worst // case extra replications will be scheduled, and these will get // fixed up later. if (namesystem.isInSafeMode()) { return0; }
publicsynchronized List<List<Block>> chooseUnderReplicatedBlocks( int blocksToProcess) { // initialize data structure for the return value List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL); for (int i = 0; i < LEVEL; i++) { blocksToReplicate.add(new ArrayList<Block>()); }
if (size() == 0) { // There are no blocks to collect. return blocksToReplicate; } int blockCount = 0; for (int priority = 0; priority < LEVEL; priority++) { // Go through all blocks that need replications with current priority. // 从priorityQueues中取出对应优先级的list,并转换为iterator BlockIterator neededReplicationsIterator = iterator(priority); // priorityToReplIdx是一个map,存放各个优先级中需要复制副本在list中的index // 得到priority开始复制的index Integer replIndex = priorityToReplIdx.get(priority); // skip to the first unprocessed block, which is at replIndex // 从neededReplicationsIterator中跳过找到replIndex对应的block for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) { neededReplicationsIterator.next(); } blocksToProcess = Math.min(blocksToProcess, size()); // blockCount统计复制了多个block,达到blocksToProcess则跳出循环 if (blockCount == blocksToProcess) { break; // break if already expected blocks are obtained } // Loop through all remaining blocks in the list. // 将block添加到blocksToReplicate对应的优先级list中 while (blockCount < blocksToProcess && neededReplicationsIterator.hasNext()) { Block block = neededReplicationsIterator.next(); blocksToReplicate.get(priority).add(block); replIndex++; blockCount++; } // 如果neededReplicationsIterator中没有block,并且此时的优先级为4,即最低优先级 // 则说明block已被复制结束,priorityToReplIdx进行重新赋值 if (!neededReplicationsIterator.hasNext() && neededReplicationsIterator.getPriority() == LEVEL - 1) { // reset all priorities replication index to 0 because there is no // recently added blocks in any list. for (int i = 0; i < LEVEL; i++) { priorityToReplIdx.put(i, 0); } break; } // 如果neededReplicationsIterator中没有block,则进行下一次循环,取出下优先级的list // 将当前优先级复制的index写入priorityToReplIdx priorityToReplIdx.put(priority, replIndex); } return blocksToReplicate; }
intcomputeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate){ int requiredReplication, numEffectiveReplicas; List<DatanodeDescriptor> containingNodes; DatanodeDescriptor srcNode; BlockCollection bc = null; int additionalReplRequired;
int scheduledWork = 0; // 存放需要copy的block列表 List<ReplicationWork> work = new LinkedList<ReplicationWork>(); namesystem.writeLock(); try { synchronized (neededReplications) { // 按照优先级进行复制,先复制高优先级的 for (int priority = 0; priority < blocksToReplicate.size(); priority++) { // 得到对应优先级的block list for (Block block : blocksToReplicate.get(priority)) { // block should belong to a file bc = blocksMap.getBlockCollection(block); // abandoned block or block reopened for append // 如果block被遗弃(该block没有对应的文件)或者该block正在被追加,则不复制 if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { neededReplications.remove(block, priority); // remove from neededReplications neededReplications.decrementReplicationIndex(priority); continue; } // 得到expect replication requiredReplication = bc.getBlockReplication(); // get a source data-node // 存放该block所有副本(包括live、corrupt、excess)的datanode // 在chooseSourceDatanode中赋值 containingNodes = new ArrayList<DatanodeDescriptor>(); List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>(); NumberReplicas numReplicas = new NumberReplicas(); // 选择一个复制源点srcDode, srcNode = chooseSourceDatanode( block, containingNodes, liveReplicaNodes, numReplicas, priority); if(srcNode == null) { // block can not be replicated from any node LOG.debug("Block " + block + " cannot be repl from any node"); continue; }
// liveReplicaNodes can include READ_ONLY_SHARED replicas which are // not included in the numReplicas.liveReplicas() count assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); // do not schedule more if enough replicas is already pending // 有效副本的个数由live的个数+pending的个数(准备复制的个数) numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplications.getNumReplicas(block); // live+准备复制的个数达到了requiredReplication,不进行再次复制 if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || (blockHasEnoughRacks(block)) ) { neededReplications.remove(block, priority); // remove from neededReplications neededReplications.decrementReplicationIndex(priority); blockLog.info("BLOCK* Removing " + block + " from neededReplications as it has enough replicas"); continue; } } // additionalReplRequired表示准备要复制的个数 if (numReplicas.liveReplicas() < requiredReplication) { additionalReplRequired = requiredReplication - numEffectiveReplicas; } else { additionalReplRequired = 1; // Needed on a new rack } // 将block、srcDode等信息包装成ReplicationWork,放入work的list中 work.add(new ReplicationWork(block, bc, srcNode, containingNodes, liveReplicaNodes, additionalReplRequired, priority)); } } } } finally { namesystem.writeUnlock(); } // 用于存放block副本所在dn集合,避免选target时选到已有副本的dn final Set<Node> excludedNodes = new HashSet<Node>(); for(ReplicationWork rw : work){ // Exclude all of the containing nodes from being targets. // This list includes decommissioning or corrupt nodes. excludedNodes.clear(); for (DatanodeDescriptor dn : rw.containingNodes) { excludedNodes.add(dn); } // choose replication targets: NOT HOLDING THE GLOBAL LOCK // It is costly to extract the filename for which chooseTargets is called, // so for now we pass in the block collection itself. rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes); }
synchronized (neededReplications) { Block block = rw.block; int priority = rw.priority; // Recheck since global lock was released // block should belong to a file bc = blocksMap.getBlockCollection(block); // abandoned block or block reopened for append if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { neededReplications.remove(block, priority); // remove from neededReplications rw.targets = null; neededReplications.decrementReplicationIndex(priority); continue; } requiredReplication = bc.getBlockReplication();
// do not schedule more if enough replicas is already pending NumberReplicas numReplicas = countNodes(block); numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplications.getNumReplicas(block);
if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || (blockHasEnoughRacks(block)) ) { neededReplications.remove(block, priority); // remove from neededReplications neededReplications.decrementReplicationIndex(priority); rw.targets = null; blockLog.info("BLOCK* Removing " + block + " from neededReplications as it has enough replicas"); continue; } }
if ( (numReplicas.liveReplicas() >= requiredReplication) && (!blockHasEnoughRacks(block)) ) { if (rw.srcNode.getNetworkLocation().equals( targets[0].getDatanodeDescriptor().getNetworkLocation())) { //No use continuing, unless a new rack in this case continue; } }
// Add block to the to be replicated list // 将复制block的个数放入dn中,用于统计dn上有多少个副本要复制,是否达到上限 rw.srcNode.addBlockToBeReplicated(block, targets); scheduledWork++; DatanodeStorageInfo.incrementBlocksScheduled(targets);
// Move the block-replication into a "pending" state. // The reason we use 'pending' is so we can retry // replications that fail after an appropriate amount of time. // 放入pending队列等待调度 pendingReplications.increment(block, DatanodeStorageInfo.toDatanodeDescriptors(targets)); if(blockLog.isDebugEnabled()) { blockLog.debug( "BLOCK* block " + block + " is moved from neededReplications to pendingReplications"); } // remove from neededReplications if(numEffectiveReplicas + targets.length >= requiredReplication) { neededReplications.remove(block, priority); // remove from neededReplications neededReplications.decrementReplicationIndex(priority); } } } } finally { namesystem.writeUnlock(); } ... return scheduledWork; }
// InvalidateBlocks.class synchronized List<Block> invalidateWork(final DatanodeDescriptor dn){ finallong delay = getInvalidationDelay(); if (delay > 0) { if (BlockManager.LOG.isDebugEnabled()) { BlockManager.LOG .debug("Block deletion is delayed during NameNode startup. " + "The deletion will start after " + delay + " ms."); } returnnull; } final LightWeightHashSet<Block> set = node2blocks.get(dn); if (set == null) { returnnull; }
// # blocks that can be sent in one message is limited finalint limit = blockInvalidateLimit; final List<Block> toInvalidate = set.pollN(limit);
// If we send everything in this message, remove this node entry if (set.isEmpty()) { remove(dn); } // 将要删除的block放入dn中 dn.addBlocksToBeInvalidated(toInvalidate); numBlocks -= toInvalidate.size(); return toInvalidate; }